Просмотр исходного кода

Merge remote-tracking branch 'origin/main-v4.3' into main-v4.3

曹文涛 4 лет назад
Родитель
Сommit
84878e5f7c

+ 8 - 2
.ci/build_packages/tests.sh

@@ -145,15 +145,21 @@ relup_test(){
         find . -maxdepth 1 -name "${EMQX_NAME}-*-${ARCH}.zip" |
             while read -r pkg; do
                 packagename=$(basename "${pkg}")
-                unzip "$packagename"
+                unzip -q "$packagename"
                 ./emqx/bin/emqx start || ( tail emqx/log/emqx.log.1 && exit 1 )
                 ./emqx/bin/emqx_ctl status
                 ./emqx/bin/emqx versions
                 cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-${ARCH}".zip ./emqx/releases
                 ./emqx/bin/emqx install "${TARGET_VERSION}"
                 [ "$(./emqx/bin/emqx versions |grep permanent | awk '{print $2}')" = "${TARGET_VERSION}" ] || exit 1
+                export EMQX_WAIT_FOR_STOP=300
                 ./emqx/bin/emqx_ctl status
-                ./emqx/bin/emqx stop
+                if ! ./emqx/bin/emqx stop; then
+                    cat emqx/log/erlang.log.1 || true
+                    cat emqx/log/emqx.log.1 || true
+                    echo "failed to stop emqx"
+                    exit 1
+                fi
                 rm -rf emqx
             done
    fi

+ 1 - 1
.ci/docker-compose-file/http-service/Dockerfile

@@ -1,7 +1,7 @@
 FROM tomcat:10.0.5
 
 RUN wget https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip \
-	&& unzip apache-maven-3.6.3-bin.zip \
+	&& unzip -q apache-maven-3.6.3-bin.zip \
 	&& mv apache-maven-3.6.3 /opt/apache-maven-3.6.3/ \
 	&& ln -s /opt/apache-maven-3.6.3/ /opt/maven
 ENV M2_HOME=/opt/maven

+ 17 - 7
.github/workflows/build_packages.yaml

@@ -108,7 +108,7 @@ jobs:
         mkdir -p _packages/${{ matrix.profile }}
         Compress-Archive -Path _build/${{ matrix.profile }}/rel/emqx -DestinationPath _build/${{ matrix.profile }}/rel/$pkg_name
         mv _build/${{ matrix.profile }}/rel/$pkg_name _packages/${{ matrix.profile }}
-        Get-FileHash -Path "_packages/${{ matrix.profile }}/$pkg_name" | Format-List | grep 'Hash' | awk '{print $3}'  > _packages/${{ matrix.profile }}/$pkg_name.sha256
+        sha256sum "_packages/${{ matrix.profile }}/$pkg_name" | head -c 64 > "_packages/${{ matrix.profile }}/${pkg_name}.sha256"
     - name: run emqx
       timeout-minutes: 1
       run: |
@@ -195,8 +195,14 @@ jobs:
           exit 1
         fi
         ./emqx/bin/emqx_ctl status
-        ./emqx/bin/emqx stop
+        if ! ./emqx/bin/emqx stop; then
+          cat emqx/log/erlang.log.1 || true
+          cat emqx/log/emqx.log.1 || true
+          echo "failed to stop emqx"
+          exit 1
+        fi
         rm -rf emqx
+        #sha256sum ./_packages/${{ matrix.profile }}/$pkg_name | head -c64  > ./_packages/${{ matrix.profile }}/$pkg_name.sha256
         openssl dgst -sha256 ./_packages/${{ matrix.profile }}/$pkg_name | awk '{print $2}'  > ./_packages/${{ matrix.profile }}/$pkg_name.sha256
     - uses: actions/upload-artifact@v1
       if: startsWith(github.ref, 'refs/tags/')
@@ -340,8 +346,12 @@ jobs:
         profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
         registry:
           - 'docker.io'
-        include:
-          - profile: emqx
+          - 'public.ecr.aws'
+        exclude:
+          # we don't have an aws ecr repo for enterprise and edge yet
+          - profile: emqx-edge
+            registry: 'public.ecr.aws'
+          - profile: emqx-ee
             registry: 'public.ecr.aws'
 
     steps:
@@ -357,16 +367,16 @@ jobs:
         image: tonistiigi/binfmt:latest
         platforms: all
     - uses: aws-actions/configure-aws-credentials@v1
-      if: matrix.repository == 'public.ecr.aws'
+      if: matrix.registry == 'public.ecr.aws'
       with:
         aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
         aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
         aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
     - name: Docker login to aws ecr
-      if: matrix.repository == 'public.ecr.aws'
+      if: matrix.registry == 'public.ecr.aws'
       run: aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws
     - uses: docker/login-action@v1
-      if: matrix.repository == 'docker.io'
+      if: matrix.registry == 'docker.io'
       with:
         username: ${{ secrets.DOCKER_HUB_USER }}
         password: ${{ secrets.DOCKER_HUB_TOKEN }}

+ 12 - 0
CHANGES-4.3.md

@@ -10,6 +10,18 @@ File format:
 - One list item per change topic
   Change log ends with a list of github PRs
 
+## v4.3.13
+
+### Enhancements
+
+* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
+  to force an immediate reload of all certificates after the files are updated on disk.
+
+### Bug fixes
+
+* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
+* Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009]
+
 ## v4.3.12
 ### Important changes
 

+ 7 - 6
apps/emqx_lwm2m/src/emqx_lwm2m.app.src

@@ -1,7 +1,8 @@
 {application,emqx_lwm2m,
-             [{description,"EMQ X LwM2M Gateway"},
-              {vsn, "4.3.5"}, % strict semver, bump manually!
-              {modules,[]},
-              {registered,[emqx_lwm2m_sup]},
-              {applications,[kernel,stdlib,lwm2m_coap]},
-              {mod,{emqx_lwm2m_app,[]}}]}.
+ [{description,"EMQ X LwM2M Gateway"},
+  {vsn, "4.3.6"}, % strict semver, bump manually!
+  {modules,[]},
+  {registered,[emqx_lwm2m_sup]},
+  {applications,[kernel,stdlib,lwm2m_coap]},
+  {mod,{emqx_lwm2m_app,[]}}]
+}.

+ 10 - 6
apps/emqx_lwm2m/src/emqx_lwm2m.appup.src

@@ -4,13 +4,17 @@
     [{restart_application,emqx_lwm2m}]},
    {"4.3.2",
     [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
-     {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}]},
-   {"4.3.3",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]},
-   {"4.3.4",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}],
+     {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
+     {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
+   {<<"4\\.3\\.[3-5]">>,
+    [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
+     {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}],
   [{<<"4\\.3\\.[0-1]">>,
     [{restart_application,emqx_lwm2m}]},
    {"4.3.2",
     [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
-     {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}]},
-   {"4.3.3",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]},
-   {"4.3.4",[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}]}]}.
+     {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
+     {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
+   {<<"4\\.3\\.[3-5]">>,
+    [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
+     {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}.

+ 2 - 2
apps/emqx_lwm2m/src/emqx_lwm2m_api.erl

@@ -51,7 +51,7 @@
         ]).
 
 list(#{node := Node }, Params) ->
-    case Node = node() of
+    case Node =:= node() of
         true -> list(#{}, Params);
         _ -> rpc_call(Node, list, [#{}, Params])
     end;
@@ -61,7 +61,7 @@ list(#{}, _Params) ->
     return({ok, format(Channels)}).
 
 lookup_cmd(#{ep := Ep, node := Node}, Params) ->
-    case Node = node() of
+    case Node =:= node() of
         true -> lookup_cmd(#{ep => Ep}, Params);
         _ -> rpc_call(Node, lookup_cmd, [#{ep => Ep}, Params])
     end;

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.10"}, % strict semver, bump manually!
+  {vsn, "4.3.11"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 18 - 3
apps/emqx_management/src/emqx_mgmt.erl

@@ -51,6 +51,10 @@
         , set_quota_policy/2
         ]).
 
+-export([ clean_pem_cache/0
+        , clean_pem_cache/1
+        ]).
+
 %% Internal funcs
 -export([call_client/3]).
 
@@ -254,15 +258,17 @@ clean_acl_cache(Node, ClientId) ->
     rpc_call(Node, clean_acl_cache, [Node, ClientId]).
 
 clean_acl_cache_all() ->
-    Results = [{Node, clean_acl_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()],
-    case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
+    for_nodes(fun clean_acl_cache_all/1).
+
+for_nodes(F) ->
+    Results = [{Node, F(Node)} || Node <- ekka_mnesia:running_nodes()],
+    case lists:filter(fun({_Node, Res}) -> Res =/= ok end, Results) of
         []  -> ok;
         BadNodes -> {error, BadNodes}
     end.
 
 clean_acl_cache_all(Node) when Node =:= node() ->
     emqx_acl_cache:drain_cache();
-
 clean_acl_cache_all(Node) ->
     rpc_call(Node, clean_acl_cache_all, [Node]).
 
@@ -272,6 +278,15 @@ set_ratelimit_policy(ClientId, Policy) ->
 set_quota_policy(ClientId, Policy) ->
     call_client(ClientId, {quota, Policy}).
 
+clean_pem_cache() ->
+    for_nodes(fun clean_pem_cache/1).
+
+clean_pem_cache(Node) when Node =:= node() ->
+    _ = ssl_pem_cache:clear(),
+    ok;
+clean_pem_cache(Node) ->
+    rpc_call(Node, ?FUNCTION_NAME, [Node]).
+
 %% @private
 call_client(ClientId, Req) ->
     Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()],

+ 31 - 14
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -40,6 +40,7 @@
         , mgmt/1
         , data/1
         , acl/1
+        , pem_cache/1
         ]).
 
 -define(PROC_INFOKEYS, [status,
@@ -576,21 +577,11 @@ data(_) ->
 %% @doc acl Command
 
 acl(["cache-clean", "node", Node]) ->
-    case emqx_mgmt:clean_acl_cache_all(erlang:list_to_existing_atom(Node)) of
-        ok ->
-            emqx_ctl:print("ACL cache drain started on node ~s.~n", [Node]);
-        {error, Reason} ->
-            emqx_ctl:print("ACL drain failed on node ~s: ~0p.~n", [Node, Reason])
-    end;
-
+    with_log(fun() -> for_node(fun emqx_mgmt:clean_acl_cache_all/1, Node) end,
+             "ACL cache drain start");
 acl(["cache-clean", "all"]) ->
-    case emqx_mgmt:clean_acl_cache_all() of
-        ok ->
-            emqx_ctl:print("Started ACL cache drain in all nodes~n");
-        {error, Reason} ->
-            emqx_ctl:print("ACL cache-clean failed: ~p.~n", [Reason])
-    end;
-
+    with_log(fun emqx_mgmt:clean_acl_cache_all/1,
+             "ACL cache drain start");
 acl(["cache-clean", ClientId]) ->
     emqx_mgmt:clean_acl_cache(ClientId);
 
@@ -600,6 +591,15 @@ acl(_) ->
                     {"acl cache-clean <ClientId>",      "Clears acl cache for given client"}
                    ]).
 
+pem_cache(["clean", "all"]) ->
+    with_log(fun emqx_mgmt:clean_pem_cache/0, "PEM cache clean");
+pem_cache(["clean", "node", Node]) ->
+    with_log(fun() -> for_node(fun emqx_mgmt:clean_pem_cache/1, Node) end, "PEM cache clean");
+pem_cache(_) ->
+    emqx_ctl:usage([{"pem_cache clean all",         "Clears x509 certificate cache on all nodes"},
+                    {"pem_cache clean node <Node>", "Clears x509 certificate cache on given node"}
+                   ]).
+
 %%--------------------------------------------------------------------
 %% Dump ETS
 %%--------------------------------------------------------------------
@@ -721,3 +721,20 @@ restart_http_listener(Scheme, AppName) ->
 
 http_mod_name(emqx_management) -> emqx_mgmt_http;
 http_mod_name(Name) -> Name.
+
+for_node(Fun, Node) ->
+    try list_to_existing_atom(Node) of
+        NodeAtom ->
+            Fun(NodeAtom)
+    catch
+        error : badarg ->
+            {error, unknown_node}
+    end.
+
+with_log(Fun, Msg) ->
+    case Fun() of
+        ok ->
+            emqx_ctl:print("~s OK~n", [Msg]);
+        {error, Reason} ->
+            emqx_ctl:print("~s FAILED~n~p~n", [Msg, Reason])
+    end.

+ 1 - 1
lib-ce/emqx_dashboard/src/emqx_dashboard.app.src

@@ -1,6 +1,6 @@
 {application, emqx_dashboard,
  [{description, "EMQ X Web Dashboard"},
-  {vsn, "4.3.8"}, % strict semver, bump manually!
+  {vsn, "4.3.9"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_dashboard_sup]},
   {applications, [kernel,stdlib,mnesia,minirest]},

+ 6 - 1
src/emqx.app.src

@@ -1,7 +1,12 @@
 {application, emqx,
  [{id, "emqx"},
   {description, "EMQ X"},
-  {vsn, "4.3.13"}, % strict semver, bump manually!
+  %% Note: this version is not the same as the release version!  This
+  %% is simply the emqx `application' version, which is separate from
+  %% the emqx `release' version, which in turn is comprised of several
+  %% apps, one of which is this.  See `emqx_release.hrl' for more
+  %% info.
+  {vsn, "4.3.14"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

+ 4 - 2
src/emqx.appup.src

@@ -1,6 +1,7 @@
 %% -*- mode: erlang -*-
 {VSN,
-  [{"4.3.12",
+  [{"4.3.13",[{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.12",
     [{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
@@ -335,7 +336,8 @@
      {load_module,emqx_message,brutal_purge,soft_purge,[]},
      {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
-  [{"4.3.12",
+  [{"4.3.13",[{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.12",
     [{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
      {load_module,emqx_channel,brutal_purge,soft_purge,[]},

+ 2 - 3
src/emqx_channel.erl

@@ -1341,7 +1341,7 @@ process_alias(Packet = #mqtt_packet{
         {ok, Topic} ->
             NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
             {ok, Packet#mqtt_packet{variable = NPublish}, Channel};
-        false -> {error, ?RC_PROTOCOL_ERROR}
+        error -> {error, ?RC_PROTOCOL_ERROR}
     end;
 
 process_alias(#mqtt_packet{
@@ -1685,7 +1685,7 @@ run_hooks(Name, Args, Acc) ->
 
 -compile({inline, [find_alias/3, save_alias/4]}).
 
-find_alias(_, _, undefined) -> false;
+find_alias(_, _, undefined) -> error;
 find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) ->
     maps:find(AliasId, Aliases);
 find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) ->
@@ -1739,4 +1739,3 @@ flag(false) -> 0.
 set_field(Name, Value, Channel) ->
     Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
     setelement(Pos+1, Channel, Value).
-

+ 21 - 0
test/emqx_channel_SUITE.erl

@@ -674,6 +674,13 @@ t_process_alias(_) ->
     {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
         emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
 
+t_process_alias_inexistent_alias(_) ->
+    Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+    Channel = channel(),
+    ?assertEqual(
+      {error, ?RC_PROTOCOL_ERROR},
+      emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel)).
+
 t_packing_alias(_) ->
     Packet1 = #mqtt_packet{variable = #mqtt_packet_publish{
                                          topic_name = <<"x">>,
@@ -710,6 +717,20 @@ t_packing_alias(_) ->
                    #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
                    channel())).
 
+t_packing_alias_inexistent_alias(_) ->
+    Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
+    Channel = channel(),
+    Packet = #mqtt_packet{variable = Publish},
+    ExpectedChannel = emqx_channel:set_field(
+                        topic_aliases,
+                        #{ inbound => #{}
+                         , outbound => #{<<>> => 1}
+                         },
+                        Channel),
+    ?assertEqual(
+      {Packet, ExpectedChannel},
+      emqx_channel:packing_alias(Packet, Channel)).
+
 t_check_pub_acl(_) ->
     ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end),
     Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),