فهرست منبع

Merge pull request #6526 from emqx/release-5.0-beta.3

Sync 5.0-beta.3 fixes into master
tigercl 4 سال پیش
والد
کامیت
41694b7b34

+ 1 - 1
Makefile

@@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3
 export EMQX_DEFAULT_RUNNER = alpine:3.14
 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
 export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
-export EMQX_DASHBOARD_VERSION ?= v0.8.0
+export EMQX_DASHBOARD_VERSION ?= v0.10.0
 export DOCKERFILE := deploy/docker/Dockerfile
 export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing
 ifeq ($(OS),Windows_NT)

+ 1 - 0
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -194,6 +194,7 @@ format(Traces) ->
               end, Traces).
 
 init([]) ->
+    ok = mria:wait_for_tables([?TRACE]),
     erlang:process_flag(trap_exit, true),
     OriginLogLevel = emqx_logger:get_primary_log_level(),
     ok = filelib:ensure_dir(trace_dir()),

+ 6 - 6
apps/emqx_dashboard/src/emqx_dashboard_collection.erl

@@ -22,7 +22,7 @@
 
 -export([get_collect/0]).
 
--export([get_local_time/0]).
+-export([get_universal_epoch/0]).
 
 -boot_mnesia({mnesia, [boot]}).
 
@@ -108,7 +108,7 @@ handle_info(collect, State = #{count := Count, collect := Collect, temp_collect
 
 handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) ->
     timer(?CLEAR_INTERVAL, clear_expire_data),
-    T1 = get_local_time(),
+    T1 = get_universal_epoch(),
     Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end),
     Collects = ets:select(?TAB_COLLECT, Spec),
     lists:foreach(fun(Collect) ->
@@ -161,7 +161,7 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
                diff(Received, Received0),
                diff(Sent, Sent0),
                diff(Dropped, Dropped0)},
-    Ts = get_local_time(),
+    Ts = get_universal_epoch(),
     {atomic, ok} = mria:transaction(mria:local_content_shard(),
                                     fun mnesia:write/3,
                                     [ ?TAB_COLLECT
@@ -179,8 +179,8 @@ timer(Secs, Msg) ->
     erlang:send_after(Secs, self(), Msg).
 
 get_today_remaining_seconds() ->
-    ?CLEAR_INTERVAL - (get_local_time() rem ?CLEAR_INTERVAL).
+    ?CLEAR_INTERVAL - (get_universal_epoch() rem ?CLEAR_INTERVAL).
 
-get_local_time() ->
-    (calendar:datetime_to_gregorian_seconds(calendar:local_time()) -
+get_universal_epoch() ->
+    (calendar:datetime_to_gregorian_seconds(calendar:universal_time()) -
         calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})).

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -278,7 +278,7 @@ sampling(Node, Counter) ->
     rpc:call(Node, ?MODULE, sampling, [Node, Counter]).
 
 select_data() ->
-    Time = emqx_dashboard_collection:get_local_time() - 7200000,
+    Time = emqx_dashboard_collection:get_universal_epoch() - 7200000,
     ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]).
 
 format(Collects) ->

+ 8 - 2
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -211,11 +211,16 @@ check_request_body(#{body := Body}, Schema, Module, CheckFun, true) ->
 %%                   {good_nest_2, mk(ref(?MODULE, good_ref), #{})}
 %%                ]}
 %% ]
-check_request_body(#{body := Body}, Spec, _Module, CheckFun, false) ->
+check_request_body(#{body := Body}, Spec, _Module, CheckFun, false)when is_list(Spec) ->
     lists:foldl(fun({Name, Type}, Acc) ->
         Schema = ?INIT_SCHEMA#{roots => [{Name, Type}]},
         maps:merge(Acc, CheckFun(Schema, Body, #{}))
-                end, #{}, Spec).
+                end, #{}, Spec);
+
+%% requestBody => #{content => #{ 'application/octet-stream' =>
+%% #{schema => #{ type => string, format => binary}}}
+check_request_body(#{body := Body}, Spec, _Module, _CheckFun, false)when is_map(Spec) ->
+    Body.
 
 %% tags, description, summary, security, deprecated
 meta_to_spec(Meta, Module) ->
@@ -287,6 +292,7 @@ trans_desc(Spec, Hocon) ->
         Desc -> Spec#{description => to_bin(Desc)}
     end.
 
+request_body(#{content := _} = Content, _Module) -> {Content, []};
 request_body([], _Module) -> {[], []};
 request_body(Schema, Module) ->
     {{Props, Refs}, Examples} =

+ 7 - 3
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -460,8 +460,9 @@ process_connect(#channel{ctx = Ctx,
         {ok, _Sess} ->
             RandVal = rand:uniform(?TOKEN_MAXIMUM),
             Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
+            NResult = Result#{events => [{event, connected}]},
             iter(Iter,
-                 reply({ok, created}, Token, Msg, Result),
+                 reply({ok, created}, Token, Msg, NResult),
                  Channel#channel{token = Token});
         {error, Reason} ->
             ?SLOG(error, #{ msg => "failed_open_session"
@@ -568,7 +569,8 @@ process_out(Outs, Result, Channel, _) ->
                 Reply ->
                     [Reply | Outs2]
             end,
-    {ok, {outgoing, Outs3}, Channel}.
+    Events = maps:get(events, Result, []),
+    {ok, [{outgoing, Outs3}] ++ Events, Channel}.
 
 %% leaf node
 process_nothing(_, _, Channel) ->
@@ -607,4 +609,6 @@ process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
     Session2 = emqx_coap_session:set_reply(Reply, Session),
     Outs = maps:get(out, Result, []),
     Outs2 = lists:reverse(Outs),
-    {ok, {outgoing, [Reply | Outs2]}, Channel#channel{session = Session2}}.
+    Events = maps:get(events, Result, []),
+    {ok, [{outgoing, [Reply | Outs2]}] ++ Events,
+     Channel#channel{session = Session2}}.

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_api.erl

@@ -83,7 +83,7 @@ gateway(post, Request) ->
                     {ok, NGwConf} ->
                         {201, NGwConf};
                     {error, Reason} ->
-                        return_http_error(500, Reason)
+                        emqx_gateway_http:reason2resp(Reason)
                 end
         end
     catch

+ 8 - 7
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -745,7 +745,8 @@ common_client_props() ->
                        "due to exceeding the length">>})}
     , {awaiting_rel_cnt,
        mk(integer(),
-          #{ desc => <<"Number of awaiting PUBREC packet">>})}
+          %% FIXME: PUBREC ??
+          #{ desc => <<"Number of awaiting acknowledge packet">>})}
     , {awaiting_rel_max,
        mk(integer(),
           #{ desc => <<"Maximum allowed number of awaiting PUBREC "
@@ -755,25 +756,25 @@ common_client_props() ->
           #{ desc => <<"Number of bytes received by EMQ X Broker">>})}
     , {recv_cnt,
        mk(integer(),
-          #{ desc => <<"Number of TCP packets received">>})}
+          #{ desc => <<"Number of socket packets received">>})}
     , {recv_pkt,
        mk(integer(),
-          #{ desc => <<"Number of MQTT packets received">>})}
+          #{ desc => <<"Number of protocol packets received">>})}
     , {recv_msg,
        mk(integer(),
-          #{ desc => <<"Number of PUBLISH packets received">>})}
+          #{ desc => <<"Number of message packets received">>})}
     , {send_oct,
        mk(integer(),
           #{ desc => <<"Number of bytes sent">>})}
     , {send_cnt,
        mk(integer(),
-          #{ desc => <<"Number of TCP packets sent">>})}
+          #{ desc => <<"Number of socket packets sent">>})}
     , {send_pkt,
        mk(integer(),
-          #{ desc => <<"Number of MQTT packets sent">>})}
+          #{ desc => <<"Number of protocol packets sent">>})}
     , {send_msg,
        mk(integer(),
-          #{ desc => <<"Number of PUBLISH packets sent">>})}
+          #{ desc => <<"Number of message packets sent">>})}
     , {mailbox_len,
        mk(integer(),
           #{ desc => <<"Process mailbox size">>})}

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -32,7 +32,7 @@
 
 -import(emqx_gateway_api_authn, [schema_authn/0]).
 
-%% minirest/dashbaord_swagger behaviour callbacks
+%% minirest/dashboard_swagger behaviour callbacks
 -export([ api_spec/0
         , paths/0
         , schema/1

+ 44 - 11
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -248,7 +248,8 @@ update(Req) ->
     res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
 
 res({ok, Result}) -> {ok, Result};
-res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason};
+res({error, {pre_config_update,?MODULE,Reason}}) -> {error, Reason};
+res({error, {post_config_update,?MODULE,Reason}}) -> {error, Reason};
 res({error, Reason}) -> {error, Reason}.
 
 bin({LType, LName}) ->
@@ -314,12 +315,12 @@ pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
             NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
             {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})};
         _ ->
-            {error, already_exist}
+            badres_gateway(already_exist, GwName)
     end;
 pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
     case maps:get(GwName, RawConf, undefined) of
         undefined ->
-            {error, not_found};
+            badres_gateway(not_found, GwName);
         _ ->
             NConf = maps:without([<<"listeners">>, ?AUTHN_BIN], Conf),
             {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
@@ -341,13 +342,13 @@ pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
                    RawConf,
                    #{GwName => #{<<"listeners">> => NListener}})};
         _ ->
-            {error, already_exist}
+            badres_listener(already_exist, GwName, LType, LName)
     end;
 pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
         undefined ->
-            {error, not_found};
+            badres_listener(not_found, GwName, LType, LName);
         OldConf ->
             NConf = convert_certs(certs_dir(GwName), Conf, OldConf),
             NListener = #{LType => #{LName => NConf}},
@@ -374,14 +375,14 @@ pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
                    RawConf,
                    #{GwName => #{?AUTHN_BIN => Conf}})};
         _ ->
-            {error, already_exist}
+            badres_authn(already_exist, GwName)
     end;
 pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName],
            RawConf, undefined) of
         undefined ->
-            {error, not_found};
+            badres_listener(not_found, GwName, LType, LName);
         Listener ->
             case maps:get(?AUTHN_BIN, Listener, undefined) of
                 undefined ->
@@ -391,14 +392,14 @@ pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
                                    #{LType => #{LName => NListener}}}},
                     {ok, emqx_map_lib:deep_merge(RawConf, NGateway)};
                 _ ->
-                    {error, already_exist}
+                    badres_listener_authn(already_exist, GwName, LType, LName)
             end
     end;
 pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, ?AUTHN_BIN], RawConf, undefined) of
         undefined ->
-            {error, not_found};
+            badres_authn(not_found, GwName);
         _ ->
             {ok, emqx_map_lib:deep_merge(
                    RawConf,
@@ -409,11 +410,11 @@ pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
            [GwName, <<"listeners">>, LType, LName],
            RawConf, undefined) of
         undefined ->
-            {error, not_found};
+            badres_listener(not_found, GwName, LType, LName);
         Listener ->
             case maps:get(?AUTHN_BIN, Listener, undefined) of
                 undefined ->
-                    {error, not_found};
+                    badres_listener_authn(not_found, GwName, LType, LName);
                 Auth ->
                     NListener = maps:put(
                                   ?AUTHN_BIN,
@@ -437,6 +438,38 @@ pre_config_update(_, UnknownReq, _RawConf) ->
     logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
     {error, badreq}.
 
+badres_gateway(not_found, GwName) ->
+    {error, {badres, #{resource => gateway, gateway => GwName,
+                       reason => not_found}}};
+badres_gateway(already_exist, GwName) ->
+    {error, {badres, #{resource => gateway, gateway => GwName,
+                       reason => already_exist}}}.
+
+badres_listener(not_found, GwName, LType, LName) ->
+    {error, {badres, #{resource => listener, gateway => GwName,
+                       listener => {GwName, LType, LName},
+                       reason => not_found}}};
+badres_listener(already_exist, GwName, LType, LName) ->
+    {error, {badres, #{resource => listener, gateway => GwName,
+                       listener => {GwName, LType, LName},
+                       reason => already_exist}}}.
+
+badres_authn(not_found, GwName) ->
+    {error, {badres, #{resource => authn, gateway => GwName,
+                       reason => not_found}}};
+badres_authn(already_exist, GwName) ->
+    {error, {badres, #{resource => authn, gateway => GwName,
+                       reason => already_exist}}}.
+
+badres_listener_authn(not_found, GwName, LType, LName) ->
+    {error, {badres, #{resource => listener_authn, gateway => GwName,
+                       listener => {GwName, LType, LName},
+                       reason => not_found}}};
+badres_listener_authn(already_exist, GwName, LType, LName) ->
+    {error, {badres, #{resource => listener_authn, gateway => GwName,
+                       listener => {GwName, LType, LName},
+                       reason => already_exist}}}.
+
 -spec post_config_update(list(atom()),
                          emqx_config:update_request(),
                          emqx_config:config(),

+ 59 - 33
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -23,6 +23,8 @@
 
 -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
 
+-import(emqx_gateway_utils, [listener_id/3]).
+
 %% Mgmt APIs - gateway
 -export([ gateways/1
         ]).
@@ -59,10 +61,7 @@
         , with_authn/2
         , with_listener_authn/3
         , checks/2
-        , schema_bad_request/0
-        , schema_not_found/0
-        , schema_internal_error/0
-        , schema_no_content/0
+        , reason2resp/1
         ]).
 
 -type gateway_summary() ::
@@ -131,7 +130,7 @@ current_connections_count(GwName) ->
 get_listeners_status(GwName, Config) ->
     Listeners = emqx_gateway_utils:normalize_config(Config),
     lists:map(fun({Type, LisName, ListenOn, _, _}) ->
-        Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
+        Name0 = listener_id(GwName, Type, LisName),
         Name = {Name0, ListenOn},
         LisO = #{id => Name0, type => Type, name => LisName},
         case catch esockd:listener(Name) of
@@ -223,12 +222,7 @@ remove_authn(GwName, ListenerId) ->
 
 confexp(ok) -> ok;
 confexp({ok, Res}) -> {ok, Res};
-confexp({error, badarg}) ->
-    error({update_conf_error, badarg});
-confexp({error, not_found}) ->
-    error({update_conf_error, not_found});
-confexp({error, already_exist}) ->
-    error({update_conf_error, already_exist}).
+confexp({error, Reason}) -> error(Reason).
 
 %%--------------------------------------------------------------------
 %% Mgmt APIs - clients
@@ -322,6 +316,59 @@ with_channel(GwName, ClientId, Fun) ->
 %% Utils
 %%--------------------------------------------------------------------
 
+-spec reason2resp({atom(), map()} | any()) -> binary() | any().
+reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) ->
+    fmt400err("Bad config value '~s' for '~s', reason: ~s",
+              [Value, Key, Reason]);
+reason2resp({badres, #{resource := gateway,
+                       gateway := GwName,
+                       reason := not_found}}) ->
+    fmt400err("The ~s gateway is unloaded", [GwName]);
+
+reason2resp({badres, #{resource := gateway,
+                       gateway := GwName,
+                       reason := already_exist}}) ->
+    fmt400err("The ~s gateway has loaded", [GwName]);
+
+reason2resp({badres, #{resource := listener,
+                       listener := {GwName, LType, LName},
+                       reason := not_found}}) ->
+    fmt400err("Listener ~s not found",
+              [listener_id(GwName, LType, LName)]);
+
+reason2resp({badres, #{resource := listener,
+                       listener := {GwName, LType, LName},
+                       reason := already_exist}}) ->
+    fmt400err("The listener ~s of ~s already exist",
+              [listener_id(GwName, LType, LName), GwName]);
+
+reason2resp({badres, #{resource := authn,
+                       gateway := GwName,
+                       reason := not_found}}) ->
+    fmt400err("The authentication not found on ~s", [GwName]);
+
+reason2resp({badres, #{resource := authn,
+                       gateway := GwName,
+                       reason := already_exist}}) ->
+    fmt400err("The authentication already exist on ~s", [GwName]);
+
+reason2resp({badres, #{resource := listener_authn,
+                       listener := {GwName, LType, LName},
+                       reason := not_found}}) ->
+    fmt400err("The authentication not found on ~s",
+              [listener_id(GwName, LType, LName)]);
+
+reason2resp({badres, #{resource := listener_authn,
+                       listener := {GwName, LType, LName},
+                       reason := already_exist}}) ->
+    fmt400err("The authentication already exist on ~s",
+              [listener_id(GwName, LType, LName)]);
+
+reason2resp(R) -> return_http_error(500, R).
+
+fmt400err(Fmt, Args) ->
+    return_http_error(400, io_lib:format(Fmt, Args)).
+
 -spec return_http_error(integer(), any()) -> {integer(), binary()}.
 return_http_error(Code, Msg) ->
     {Code, emqx_json:encode(
@@ -378,19 +425,12 @@ with_gateway(GwName0, Fun) ->
             Path = lists:concat(
                      lists:join(".", lists:map(fun to_list/1, Path0))),
             return_http_error(404, "Resource not found. path: " ++ Path);
-        %% Exceptions from: confexp/1
-        error : {update_conf_error, badarg} ->
-            return_http_error(400, "Bad arguments");
-        error : {update_conf_error, not_found} ->
-            return_http_error(404, "Resource not found");
-        error : {update_conf_error, already_exist} ->
-            return_http_error(400, "Resource already exist");
         Class : Reason : Stk ->
             ?SLOG(error, #{ msg => "uncatched_error"
                           , reason => {Class, Reason}
                           , stacktrace => Stk
                           }),
-            return_http_error(500, {Class, Reason, Stk})
+            reason2resp(Reason)
     end.
 
 -spec checks(list(), map()) -> ok.
@@ -408,20 +448,6 @@ to_list(A) when is_atom(A) ->
 to_list(B) when is_binary(B) ->
     binary_to_list(B).
 
-%%--------------------------------------------------------------------
-%% common schemas
-
-schema_bad_request() ->
-    emqx_mgmt_util:error_schema(
-      <<"Some Params missed">>, ['PARAMETER_MISSED']).
-schema_internal_error() ->
-    emqx_mgmt_util:error_schema(
-      <<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']).
-schema_not_found() ->
-    emqx_mgmt_util:error_schema(<<"Resource Not Found">>).
-schema_no_content() ->
-    #{description => <<"No Content">>}.
-
 %%--------------------------------------------------------------------
 %% Internal funcs
 

+ 6 - 6
apps/emqx_gateway/src/emqx_gateway_insta_sup.erl

@@ -112,7 +112,7 @@ init([Gateway, Ctx, _GwDscrptr]) ->
         true ->
             case cb_gateway_load(State) of
                 {error, Reason} ->
-                    {stop, {load_gateway_failure, Reason}};
+                    {stop, Reason};
                 {ok, NState} ->
                     {ok, NState}
             end
@@ -360,7 +360,7 @@ cb_gateway_unload(State = #state{name = GwName,
                           , reason => {Class, Reason}
                           , stacktrace => Stk
                           }),
-            {error, {Class, Reason, Stk}}
+            {error, Reason}
     after
         _ = do_deinit_authn(State#state.authns)
     end.
@@ -381,7 +381,7 @@ cb_gateway_load(State = #state{name = GwName,
         case CbMod:on_gateway_load(Gateway, NCtx) of
             {error, Reason} ->
                 do_deinit_authn(AuthnNames),
-                throw({callback_return_error, Reason});
+                {error, Reason};
             {ok, ChildPidOrSpecs, GwState} ->
                 ChildPids = start_child_process(ChildPidOrSpecs),
                 {ok, State#state{
@@ -403,7 +403,7 @@ cb_gateway_load(State = #state{name = GwName,
                           , reason => {Class, Reason1}
                           , stacktrace => Stk
                           }),
-            {error, {Class, Reason1, Stk}}
+            {error, Reason1}
     end.
 
 cb_gateway_update(Config,
@@ -412,7 +412,7 @@ cb_gateway_update(Config,
     try
         #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
         case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
-            {error, Reason} -> throw({callback_return_error, Reason});
+            {error, Reason} -> {error, Reason};
             {ok, ChildPidOrSpecs, NGwState} ->
                 %% XXX: Hot-upgrade ???
                 ChildPids = start_child_process(ChildPidOrSpecs),
@@ -430,7 +430,7 @@ cb_gateway_update(Config,
                           , reason => {Class, Reason1}
                           , stacktrace => Stk
                           }),
-            {error, {Class, Reason1, Stk}}
+            {error, Reason1}
     end.
 
 start_child_process([]) -> [];

+ 16 - 5
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -118,6 +118,7 @@ fields(mqttsn) ->
     [ {gateway_id,
        sc(integer(),
           #{ default => 1
+           , nullable => false
            , desc =>
 "MQTT-SN Gateway Id.<br>
 When the <code>broadcast</code> option is enabled,
@@ -142,6 +143,7 @@ The client just sends its PUBLISH messages to a GW"
     , {predefined,
        sc(hoconsc:array(ref(mqttsn_predefined)),
           #{ default => []
+           , nullable => {true, recursively}
            , desc =>
 <<"The Pre-defined topic ids and topic names.<br>
 A 'pre-defined' topic id is a topic id whose mapping to a topic name
@@ -217,6 +219,7 @@ fields(lwm2m) ->
     [ {xml_dir,
        sc(binary(),
           #{ default =>"etc/lwm2m_xml"
+           , nullable => false
            , desc => "The Directory for LwM2M Resource defination"
            })}
     , {lifetime_min,
@@ -265,18 +268,21 @@ beyond this time window are temporarily stored in memory."
 fields(exproto) ->
     [ {server,
        sc(ref(exproto_grpc_server),
-          #{ desc => "Configurations for starting the <code>ConnectionAdapter</code> service"
+          #{ nullable => false
+           , desc => "Configurations for starting the <code>ConnectionAdapter</code> service"
            })}
     , {handler,
        sc(ref(exproto_grpc_handler),
-          #{ desc => "Configurations for request to <code>ConnectionHandler</code> service"
+          #{ nullable => false
+           , desc => "Configurations for request to <code>ConnectionHandler</code> service"
            })}
     , {listeners, sc(ref(udp_tcp_listeners))}
     ] ++ gateway_common_options();
 
 fields(exproto_grpc_server) ->
     [ {bind,
-       sc(hoconsc:union([ip_port(), integer()]))}
+       sc(hoconsc:union([ip_port(), integer()]),
+          #{nullable => false})}
     , {ssl,
        sc(ref(ssl_server_opts),
           #{ nullable => {true, recursively}
@@ -284,7 +290,7 @@ fields(exproto_grpc_server) ->
     ];
 
 fields(exproto_grpc_handler) ->
-    [ {address, sc(binary())}
+    [ {address, sc(binary(), #{nullable => false})}
     , {ssl,
        sc(ref(ssl_client_opts),
           #{ nullable => {true, recursively}
@@ -316,11 +322,13 @@ fields(lwm2m_translators) ->
 For each new LwM2M client that succeeds in going online, the gateway creates
 a the subscription relationship to receive downstream commands and send it to
 the LwM2M client"
+           , nullable => false
            })}
     , {response,
        sc(ref(translator),
           #{ desc =>
 "The topic for gateway to publish the acknowledge events from LwM2M client"
+           , nullable => false
            })}
     , {notify,
        sc(ref(translator),
@@ -328,21 +336,24 @@ the LwM2M client"
 "The topic for gateway to publish the notify events from LwM2M client.<br>
 After succeed observe a resource of LwM2M client, Gateway will send the
 notifyevents via this topic, if the client reports any resource changes"
+           , nullable => false
            })}
     , {register,
        sc(ref(translator),
           #{ desc =>
 "The topic for gateway to publish the register events from LwM2M client.<br>"
+           , nullable => false
            })}
     , {update,
        sc(ref(translator),
           #{ desc =>
 "The topic for gateway to publish the update events from LwM2M client.<br>"
+           , nullable => false
            })}
     ];
 
 fields(translator) ->
-    [ {topic, sc(binary())}
+    [ {topic, sc(binary(), #{nullable => false})}
     , {qos, sc(range(0, 2), #{default => 0})}
     ];
 

+ 1 - 0
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -90,6 +90,7 @@ childspec(Id, Type, Mod, Args) ->
     -> {ok, pid()}
      | {error, supervisor:startchild_err()}.
 supervisor_ret({ok, Pid, _Info}) -> {ok, Pid};
+supervisor_ret({error, {Reason, _Child}}) -> {error, Reason};
 supervisor_ret(Ret) -> Ret.
 
 -spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())

+ 7 - 1
apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl

@@ -75,7 +75,13 @@ stop_grpc_server(GwName) ->
 start_grpc_client_channel(_GwName, undefined) ->
     undefined;
 start_grpc_client_channel(GwName, Options = #{address := Address}) ->
-    {Host, Port} = emqx_gateway_utils:parse_address(Address),
+    {Host, Port} = try emqx_gateway_utils:parse_address(Address)
+                   catch error : badarg ->
+                         throw({badconf, #{key => address,
+                                           value => Address,
+                                           reason => illegal_grpc_address
+                                          }})
+                   end,
     case maps:to_list(maps:get(ssl, Options, #{})) of
         [] ->
             SvrAddr = compose_http_uri(http, Host, Port),

+ 14 - 8
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl

@@ -50,14 +50,20 @@ unreg() ->
 on_gateway_load(_Gateway = #{ name := GwName,
                               config := Config
                             }, Ctx) ->
-    %% Xml registry
-    {ok, RegPid} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)),
-
-    Listeners = emqx_gateway_utils:normalize_config(Config),
-    ListenerPids = lists:map(fun(Lis) ->
-                                     start_listener(GwName, Ctx, Lis)
-                             end, Listeners),
-    {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}.
+    XmlDir = maps:get(xml_dir, Config),
+    case emqx_lwm2m_xml_object_db:start_link(XmlDir) of
+        {ok, RegPid} ->
+            Listeners = emqx_gateway_utils:normalize_config(Config),
+            ListenerPids = lists:map(fun(Lis) ->
+                                             start_listener(GwName, Ctx, Lis)
+                                     end, Listeners),
+            {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}};
+        {error, Reason} ->
+            throw({badconf, #{ key => xml_dir
+                             , value => XmlDir
+                             , reason => Reason
+                             }})
+    end.
 
 on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
     GwName = maps:get(name, Gateway),

+ 11 - 3
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl

@@ -47,6 +47,11 @@
 %% API Function Definitions
 %% ------------------------------------------------------------------
 
+-spec start_link(string())
+    -> {ok, pid()}
+     | ignore
+     | {error, no_xml_files_found}
+     | {error, term()}.
 start_link(XmlDir) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
 
@@ -85,8 +90,11 @@ stop() ->
 init([XmlDir]) ->
     _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
     _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
-    load(XmlDir),
-    {ok, #state{}}.
+    case load(XmlDir) of
+        ok ->
+            {ok, #state{}};
+        {error, Reason} -> {stop, Reason}
+    end.
 
 handle_call(_Request, _From, State) ->
     {reply, ignored, State}.
@@ -116,7 +124,7 @@ load(BaseDir) ->
                     Wild
             end,
     case filelib:wildcard(Wild2) of
-        [] -> error(no_xml_files_found, BaseDir);
+        [] -> {error, no_xml_files_found};
         AllXmlFiles -> load_loop(AllXmlFiles)
     end.
 

+ 23 - 15
apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl

@@ -245,8 +245,9 @@ t_load_unload_gateway(_) ->
                          ?CONF_STOMP_AUTHN_1,
                          ?CONF_STOMP_LISTENER_1),
     {ok, _} = emqx_gateway_conf:load_gateway(stomp, StompConf1),
-    {error, already_exist} =
-        emqx_gateway_conf:load_gateway(stomp, StompConf1),
+    ?assertMatch(
+       {error, {badres, #{reason := already_exist}}},
+       emqx_gateway_conf:load_gateway(stomp, StompConf1)),
     assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
 
     {ok, _} = emqx_gateway_conf:update_gateway(stomp, StompConf2),
@@ -255,8 +256,9 @@ t_load_unload_gateway(_) ->
     ok = emqx_gateway_conf:unload_gateway(stomp),
     ok = emqx_gateway_conf:unload_gateway(stomp),
 
-    {error, not_found} =
-        emqx_gateway_conf:update_gateway(stomp, StompConf2),
+    ?assertMatch(
+       {error, {badres, #{reason := not_found}}},
+       emqx_gateway_conf:update_gateway(stomp, StompConf2)),
 
     ?assertException(error, {config_not_found, [gateway, stomp]},
                      emqx:get_raw_config([gateway, stomp])),
@@ -280,8 +282,9 @@ t_load_remove_authn(_) ->
 
     ok = emqx_gateway_conf:remove_authn(<<"stomp">>),
 
-    {error, not_found} =
-        emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2),
+    ?assertMatch(
+       {error, {badres, #{reason := not_found}}},
+       emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2)),
 
     ?assertException(
        error, {config_not_found, [gateway, stomp, authentication]},
@@ -312,9 +315,10 @@ t_load_remove_listeners(_) ->
     ok = emqx_gateway_conf:remove_listener(
            <<"stomp">>, {<<"tcp">>, <<"default">>}),
 
-    {error, not_found} =
-        emqx_gateway_conf:update_listener(
-          <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2),
+    ?assertMatch(
+       {error, {badres, #{reason := not_found}}},
+       emqx_gateway_conf:update_listener(
+         <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2)),
 
     ?assertException(
        error, {config_not_found, [gateway, stomp, listeners, tcp, default]},
@@ -352,9 +356,10 @@ t_load_remove_listener_authn(_) ->
     ok = emqx_gateway_conf:remove_authn(
            <<"stomp">>, {<<"tcp">>, <<"default">>}),
 
-    {error, not_found} =
-        emqx_gateway_conf:update_authn(
-          <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2),
+    ?assertMatch(
+       {error, {badres, #{reason := not_found}}},
+       emqx_gateway_conf:update_authn(
+         <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2)),
 
     Path = [gateway, stomp, listeners, tcp, default, authentication],
     ?assertException(
@@ -426,9 +431,12 @@ t_add_listener_with_certs_content(_) ->
     ok = emqx_gateway_conf:remove_listener(
            <<"stomp">>, {<<"ssl">>, <<"default">>}),
     assert_ssl_confs_files_deleted(SslConf),
-    {error, not_found} =
-        emqx_gateway_conf:update_listener(
-          <<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2),
+
+    ?assertMatch(
+       {error, {badres, #{reason := not_found}}},
+       emqx_gateway_conf:update_listener(
+         <<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2)),
+
     ?assertException(
        error, {config_not_found, [gateway, stomp, listeners, ssl, default]},
        emqx:get_raw_config([gateway, stomp, listeners, ssl, default])

+ 3 - 3
apps/emqx_management/src/emqx_mgmt_api_banned.erl

@@ -101,15 +101,15 @@ fields(ban) ->
             desc => <<"Banned type clientid, username, peerhost">>,
             nullable => false,
             example => username})},
-        {who, hoconsc:mk(binary(), #{
+        {who, hoconsc:mk(emqx_schema:unicode_binary(), #{
             desc => <<"Client info as banned type">>,
             nullable => false,
-            example => <<"Badass">>})},
+            example => <<"Badass"/utf8>>})},
         {by, hoconsc:mk(binary(), #{
             desc => <<"Commander">>,
             nullable => true,
             example => <<"mgmt_api">>})},
-        {reason, hoconsc:mk(binary(), #{
+        {reason, hoconsc:mk(emqx_schema:unicode_binary(), #{
             desc => <<"Banned reason">>,
             nullable => true,
             example => <<"Too many requests">>})},

+ 26 - 23
apps/emqx_modules/src/emqx_telemetry.erl

@@ -220,37 +220,24 @@ os_info() ->
             [{os_name, Name},
              {os_version, Version}];
         {unix, _} ->
-            case file:read_file_info("/etc/os-release") of
+            case file:read_file("/etc/os-release") of
                 {error, _} ->
                     [{os_name, "Unknown"},
                      {os_version, "Unknown"}];
-                {ok, FileInfo} ->
-                    case FileInfo#file_info.access of
-                        Access when Access =:= read orelse Access =:= read_write ->
-                            OSInfo = lists:foldl(fun(Line, Acc) ->
-                                                     [Var, Value] = string:tokens(Line, "="),
-                                                     NValue = case Value of
-                                                                  _ when is_list(Value) ->
-                                                                      lists:nth(1, string:tokens(Value, "\""));
-                                                                  _ ->
-                                                                      Value
-                                                              end,
-                                                     [{Var, NValue} | Acc]
-                                                 end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")),
-                            [{os_name, get_value("NAME", OSInfo, "Unknown")},
-                             {os_version, get_value("VERSION", OSInfo,
-                                                    get_value("VERSION_ID", OSInfo, "Unknown"))}];
-                        _ ->
-                            [{os_name, "Unknown"},
-                             {os_version, "Unknown"}]
-                    end
+                {ok, FileContent} ->
+                    OSInfo = parse_os_release(FileContent),
+                    [{os_name, get_value("NAME", OSInfo)},
+                     {os_version, get_value("VERSION", OSInfo,
+                                            get_value("VERSION_ID", OSInfo,
+                                                      get_value("PRETTY_NAME", OSInfo)))}]
             end;
         {win32, nt} ->
             Ver = os:cmd("ver"),
             case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of
                 match ->
                     [NVer | _] = string:tokens(Ver, "\r\n"),
-                    {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]),
+                    {match, [Version]} =
+                        re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]),
                     [Name | _] = string:split(NVer, " [Version "),
                     [{os_name, Name},
                      {os_version, Version}];
@@ -307,7 +294,8 @@ generate_uuid() ->
     <<NTimeHigh:16>> = <<16#01:4, TimeHigh:12>>,
     <<NClockSeq:16>> = <<1:1, 0:1, ClockSeq:14>>,
     <<Node:48>> = <<First:7, 1:1, Last:40>>,
-    list_to_binary(io_lib:format("~.16B-~.16B-~.16B-~.16B-~.16B", [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])).
+    list_to_binary(io_lib:format( "~.16B-~.16B-~.16B-~.16B-~.16B"
+                                , [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])).
 
 get_telemetry(#state{uuid = UUID}) ->
     OSInfo = os_info(),
@@ -339,7 +327,22 @@ report_telemetry(State = #state{url = URL}) ->
 httpc_request(Method, URL, Headers, Body) ->
     httpc:request(Method, {URL, Headers, "application/json", Body}, [], []).
 
+parse_os_release(FileContent) ->
+    lists:foldl(fun(Line, Acc) ->
+                        [Var, Value] = string:tokens(Line, "="),
+                        NValue = case Value of
+                                     _ when is_list(Value) ->
+                                         lists:nth(1, string:tokens(Value, "\""));
+                                     _ ->
+                                         Value
+                                 end,
+                        [{Var, NValue} | Acc]
+                end,
+                [], string:tokens(binary:bin_to_list(FileContent), "\n")).
+
 bin(L) when is_list(L) ->
     list_to_binary(L);
+bin(A) when is_atom(A) ->
+    atom_to_binary(A);
 bin(B) when is_binary(B) ->
     B.

+ 1 - 0
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -68,6 +68,7 @@ fields("rule_events") ->
 fields("rule_test") ->
     [ {"context", sc(hoconsc:union([ ref("ctx_pub")
                                    , ref("ctx_sub")
+                                   , ref("ctx_unsub")
                                    , ref("ctx_delivered")
                                    , ref("ctx_acked")
                                    , ref("ctx_dropped")

+ 6 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -257,11 +257,16 @@ format_output(Outputs) ->
     [do_format_output(Out) || Out <- Outputs].
 
 do_format_output(#{mod := Mod, func := Func, args := Args}) ->
-    #{function => list_to_binary(lists:concat([Mod,":",Func])),
+    #{function => printable_function_name(Mod, Func),
       args => maps:remove(preprocessed_tmpl, Args)};
 do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
     BridgeChannelId.
 
+printable_function_name(emqx_rule_outputs, Func) ->
+    Func;
+printable_function_name(Mod, Func) ->
+    list_to_binary(lists:concat([Mod,":",Func])).
+
 get_rule_metrics(Id) ->
     Format = fun (Node, #{matched := Matched,
                           rate := Current,

+ 1 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -182,6 +182,7 @@ rule_name() ->
     {"name", sc(binary(),
         #{ desc => "The name of the rule"
          , default => ""
+         , nullable => false
          , example => "foo"
          })}.