Просмотр исходного кода

Merge branch 'master' into using-erlang-system-time-5

JianBo He 3 лет назад
Родитель
Сommit
a715573c64

+ 20 - 1
.github/workflows/build_and_push_docker_images.yaml

@@ -183,9 +183,19 @@ jobs:
           img_suffix="elixir-${{ matrix.arch }}"
           img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n${img_labels}"
         fi
+
+        if [ ${{ matrix.profile }} = "emqx" ]; then
+          img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}"
+        fi
+
+        if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then
+          img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}"
+        fi
+
         if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then
           img_suffix="${img_suffix}-alpine"
         fi
+
         echo "::set-output name=emqx_name::${emqx_name}"
         echo "::set-output name=img_suffix::${img_suffix}"
         echo "::set-output name=img_labels::${img_labels}"
@@ -299,10 +309,19 @@ jobs:
             img_suffix="elixir-${{ matrix.arch }}"
             img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n$img_labels"
           fi
+
+          if [ ${{ matrix.profile }} = "emqx" ]; then
+            img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}"
+          fi
+
+          if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then
+            img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}"
+          fi
+
           if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then
             img_suffix="${img_suffix}-alpine"
           fi
-          echo "::set-output name=img::${img}"
+
           echo "::set-output name=emqx_name::${emqx_name}"
           echo "::set-output name=img_suffix::${img_suffix}"
           echo "::set-output name=img_labels::${img_labels}"

+ 2 - 0
CHANGES-5.0.md

@@ -9,6 +9,7 @@
 * Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658)
 * Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
 * Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926)
+* Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911)
 
 ## Enhancements
 
@@ -16,6 +17,7 @@
 * Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823)
 * Remove `node.etc_dir` from emqx.conf, because it is never used.
   Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892)
+* Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876)
 
 # 5.0.7
 

+ 2 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl

@@ -149,8 +149,8 @@ authenticate(
                     of
                         ok ->
                             {ok, emqx_authn_utils:is_superuser(Selected)};
-                        {error, Reason} ->
-                            {error, Reason}
+                        {error, _Reason} ->
+                            ignore
                     end;
                 {error, Reason} ->
                     ?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{

+ 13 - 1
apps/emqx_authn/test/emqx_authn_redis_SUITE.erl

@@ -173,6 +173,9 @@ test_user_auth(#{
         {create_authenticator, ?GLOBAL, AuthConfig}
     ),
 
+    {ok, [#{provider := emqx_authn_redis, state := State}]} =
+        emqx_authentication:list_authenticators(?GLOBAL),
+
     Credentials = Credentials0#{
         listener => 'tcp:default',
         protocol => mqtt
@@ -180,6 +183,15 @@ test_user_auth(#{
 
     ?assertEqual(Result, emqx_access_control:authenticate(Credentials)),
 
+    AuthnResult =
+        case Result of
+            {error, _} ->
+                ignore;
+            Any ->
+                Any
+        end,
+    ?assertEqual(AuthnResult, emqx_authn_redis:authenticate(Credentials, State)),
+
     emqx_authn_test_lib:delete_authenticators(
         [authentication],
         ?GLOBAL
@@ -466,7 +478,7 @@ user_seeds() ->
                 <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>,
                 <<"password_hash_algorithm">> => #{<<"name">> => <<"bcrypt">>}
             },
-            result => {error, bad_username_or_password}
+            result => {error, not_authorized}
         },
 
         #{

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -778,7 +778,7 @@ to_bin(List) when is_list(List) ->
 to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
 to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
 to_bin({Type, Args}) ->
-    unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args]));
+    unicode:characters_to_binary(io_lib:format("~ts(~p)", [Type, Args]));
 to_bin(X) ->
     X.
 

+ 82 - 28
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -96,6 +96,16 @@ schema("/listeners") ->
                         listener_id_status_example()
                     )
             }
+        },
+        post => #{
+            tags => [<<"listeners">>],
+            desc => <<"Create the specified listener on all nodes.">>,
+            parameters => [],
+            'requestBody' => create_listener_schema(#{bind => true}),
+            responses => #{
+                200 => listener_schema(#{bind => true}),
+                400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
+            }
         }
     };
 schema("/listeners/:id") ->
@@ -129,7 +139,8 @@ schema("/listeners/:id") ->
             responses => #{
                 200 => listener_schema(#{bind => true}),
                 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
-            }
+            },
+            deprecated => true
         },
         delete => #{
             tags => [<<"listeners">>],
@@ -251,10 +262,10 @@ fields(node_status) ->
             })},
         {status, ?HOCON(?R_REF(status))}
     ];
+fields({Type, with_name}) ->
+    listener_struct_with_name(Type);
 fields(Type) ->
-    Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
-    [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
-    Schema.
+    listener_struct(Type).
 
 listener_schema(Opts) ->
     emqx_dashboard_swagger:schema_with_example(
@@ -262,6 +273,17 @@ listener_schema(Opts) ->
         tcp_schema_example()
     ).
 
+create_listener_schema(Opts) ->
+    Schemas = [
+        ?R_REF(Mod, {Type, with_name})
+     || #{ref := ?R_REF(Mod, Type)} <- listeners_info(Opts)
+    ],
+    Example = maps:remove(id, tcp_schema_example()),
+    emqx_dashboard_swagger:schema_with_example(
+        ?UNION(Schemas),
+        Example#{name => <<"demo">>}
+    ).
+
 listeners_type() ->
     lists:map(
         fun({Type, _}) -> list_to_existing_atom(Type) end,
@@ -339,7 +361,9 @@ list_listeners(get, #{query_string := Query}) ->
             {ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners);
             error -> Listeners
         end,
-    {200, listener_status_by_id(NodeL)}.
+    {200, listener_status_by_id(NodeL)};
+list_listeners(post, #{body := Body}) ->
+    create_listener(Body).
 
 crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
     Listeners =
@@ -382,23 +406,8 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
         _ ->
             {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
     end;
-crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
-    case parse_listener_conf(Body0) of
-        {Id, Type, Name, Conf} ->
-            Path = [listeners, Type, Name],
-            case create(Path, Conf) of
-                {ok, #{raw_config := _RawConf}} ->
-                    crud_listeners_by_id(get, #{bindings => #{id => Id}});
-                {error, already_exist} ->
-                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
-                {error, Reason} ->
-                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
-            end;
-        {error, Reason} ->
-            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
-        _ ->
-            {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
-    end;
+crud_listeners_by_id(post, #{body := Body}) ->
+    create_listener(Body);
 crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
     {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
     case ensure_remove([listeners, Type, Name]) of
@@ -408,13 +417,24 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
 
 parse_listener_conf(Conf0) ->
     Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0),
-    {IdBin, Conf2} = maps:take(<<"id">>, Conf1),
-    {TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
-    {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
+    {TypeBin, Conf2} = maps:take(<<"type">>, Conf1),
     TypeAtom = binary_to_existing_atom(TypeBin),
-    case Type =:= TypeAtom of
-        true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
-        false -> {error, listener_type_inconsistent}
+
+    case maps:take(<<"id">>, Conf2) of
+        {IdBin, Conf3} ->
+            {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
+            case Type =:= TypeAtom of
+                true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
+                false -> {error, listener_type_inconsistent}
+            end;
+        _ ->
+            case maps:take(<<"name">>, Conf2) of
+                {Name, Conf3} ->
+                    IdBin = <<TypeBin/binary, $:, Name/binary>>,
+                    {binary_to_atom(IdBin), TypeAtom, Name, Conf3};
+                _ ->
+                    {error, listener_config_invalid}
+            end
     end.
 
 stop_listeners_by_id(Method, Body = #{bindings := Bindings}) ->
@@ -787,3 +807,37 @@ tcp_schema_example() ->
         type => tcp,
         zone => default
     }.
+
+create_listener(Body) ->
+    case parse_listener_conf(Body) of
+        {Id, Type, Name, Conf} ->
+            Path = [listeners, Type, Name],
+            case create(Path, Conf) of
+                {ok, #{raw_config := _RawConf}} ->
+                    crud_listeners_by_id(get, #{bindings => #{id => Id}});
+                {error, already_exist} ->
+                    {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
+                {error, Reason} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+            end;
+        {error, Reason} ->
+            {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+    end.
+
+listener_struct(Type) ->
+    Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
+    [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
+    Schema.
+
+listener_struct_with_name(Type) ->
+    BaseSchema = listener_struct(Type),
+    lists:keyreplace(
+        id,
+        1,
+        BaseSchema,
+        {name,
+            ?HOCON(binary(), #{
+                desc => "Listener name",
+                required => true
+            })}
+    ).

+ 29 - 0
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -37,6 +37,35 @@ t_list_listeners(_) ->
     Res = request(get, Path, [], []),
     #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
     ?assertEqual(length(Expect), length(Res)),
+
+    %% POST /listeners
+    ListenerId = <<"tcp:default">>,
+    NewListenerId = <<"tcp:new">>,
+
+    OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
+    NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
+
+    OriginListener = request(get, OriginPath, [], []),
+
+    %% create with full options
+    ?assertEqual({error, not_found}, is_running(NewListenerId)),
+    ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
+
+    OriginListener2 = maps:remove(<<"id">>, OriginListener),
+    NewConf = OriginListener2#{
+        <<"name">> => <<"new">>,
+        <<"bind">> => <<"0.0.0.0:2883">>
+    },
+    Create = request(post, Path, [], NewConf),
+    ?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))),
+    Get1 = request(get, NewPath, [], []),
+    ?assertMatch(Create, Get1),
+    ?assert(is_running(NewListenerId)),
+
+    %% delete
+    ?assertEqual([], delete(NewPath)),
+    ?assertEqual({error, not_found}, is_running(NewListenerId)),
+    ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
     ok.
 
 t_tcp_crud_listeners_by_id(_) ->

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx]},

+ 1 - 6
apps/emqx_retainer/src/emqx_retainer.erl

@@ -348,16 +348,12 @@ enable_retainer(
     #{context_id := ContextId} = State,
     #{
         msg_clear_interval := ClearInterval,
-        backend := BackendCfg,
-        flow_control := FlowControl
+        backend := BackendCfg
     }
 ) ->
     NewContextId = ContextId + 1,
     Context = create_resource(new_context(NewContextId), BackendCfg),
     load(Context),
-    emqx_limiter_server:add_bucket(
-        ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
-    ),
     State#{
         enable := true,
         context_id := NewContextId,
@@ -373,7 +369,6 @@ disable_retainer(
     } = State
 ) ->
     unload(),
-    emqx_limiter_server:del_bucket(?APP, internal),
     ok = close_resource(Context),
     State#{
         enable := false,

+ 13 - 0
apps/emqx_retainer/src/emqx_retainer_app.erl

@@ -18,6 +18,8 @@
 
 -behaviour(application).
 
+-include("emqx_retainer.hrl").
+
 -export([
     start/2,
     stop/1
@@ -25,8 +27,19 @@
 
 start(_Type, _Args) ->
     ok = emqx_retainer_mnesia_cli:load(),
+    init_bucket(),
     emqx_retainer_sup:start_link().
 
 stop(_State) ->
     ok = emqx_retainer_mnesia_cli:unload(),
+    delete_bucket(),
     ok.
+
+init_bucket() ->
+    #{flow_control := FlowControl} = emqx:get_config([retainer]),
+    emqx_limiter_server:add_bucket(
+        ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
+    ).
+
+delete_bucket() ->
+    emqx_limiter_server:del_bucket(?APP, internal).

+ 17 - 2
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -31,14 +31,16 @@ all() ->
     [
         {group, mnesia_without_indices},
         {group, mnesia_with_indices},
-        {group, mnesia_reindex}
+        {group, mnesia_reindex},
+        {group, test_disable_then_start}
     ].
 
 groups() ->
     [
         {mnesia_without_indices, [sequence], common_tests()},
         {mnesia_with_indices, [sequence], common_tests()},
-        {mnesia_reindex, [sequence], [t_reindex]}
+        {mnesia_reindex, [sequence], [t_reindex]},
+        {test_disable_then_start, [sequence], [test_disable_then_start]}
     ].
 
 common_tests() ->
@@ -624,6 +626,19 @@ t_get_basic_usage_info(_Config) ->
     ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
     ok.
 
+%% test whether the app can start normally after disabling emqx_retainer
+%% fix: https://github.com/emqx/emqx/pull/8911
+test_disable_then_start(_Config) ->
+    emqx_retainer:update_config(#{<<"enable">> => false}),
+    ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
+    ok = application:stop(emqx_retainer),
+    timer:sleep(100),
+    ?assertEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
+    ok = application:ensure_started(emqx_retainer),
+    timer:sleep(100),
+    ?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
+    ok.
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 9 - 4
rebar.config.erl

@@ -482,11 +482,16 @@ emqx_etc_overlay_per_edition(ee) ->
     ].
 
 get_vsn(Profile) ->
-    %% to make it compatible to Linux and Windows,
-    %% we must use bash to execute the bash file
-    %% because "./" will not be recognized as an internal or external command
-    os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile)).
+    case os:getenv("PKG_VSN") of
+        false ->
+            os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile));
+        Vsn ->
+            Vsn
+    end.
 
+%% to make it compatible to Linux and Windows,
+%% we must use bash to execute the bash file
+%% because "./" will not be recognized as an internal or external command
 os_cmd(Cmd) ->
     Output = os:cmd("bash " ++ Cmd),
     re:replace(Output, "\n", "", [{return, list}]).