Просмотр исходного кода

Merge pull request #1 from emqx/main-v4.3

Main v4.3
xiangfangyang-tech 4 лет назад
Родитель
Сommit
8fb9d27aa1

+ 3 - 3
.github/workflows/build_packages.yaml

@@ -468,7 +468,7 @@ jobs:
           -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
           -H "Accept: application/vnd.github.v3+json" \
           -X POST \
-          -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
+          -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ee\": \"true\"}}" \
           "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
     - name: update repo.emqx.io
       if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
@@ -477,7 +477,7 @@ jobs:
           -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
           -H "Accept: application/vnd.github.v3+json" \
           -X POST \
-          -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
+          -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\", \"emqx_ce\": \"true\"}}" \
           "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_repos.yaml/dispatches"
     - name: update homebrew packages
       if: github.event_name == 'release' && endsWith(github.repository, 'emqx') && matrix.profile == 'emqx'
@@ -487,7 +487,7 @@ jobs:
               -H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
               -H "Accept: application/vnd.github.v3+json" \
               -X POST \
-              -d "{\"ref\":\"v1.0.2\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
+              -d "{\"ref\":\"v1.0.3\",\"inputs\":{\"version\": \"${{ env.version }}\"}}" \
               "https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
         fi
     - uses: geekyeggo/delete-artifact@v1

+ 30 - 14
.github/workflows/run_automate_tests.yaml

@@ -1,5 +1,5 @@
 name: Integration Test Suites
-  
+
 on:
   push:
     tags:
@@ -12,18 +12,30 @@ jobs:
   build:
     runs-on: ubuntu-latest
     outputs:
+      imgname: ${{ steps.build_docker.outputs.imgname}}
       version: ${{ steps.build_docker.outputs.version}}
     steps:
     - uses: actions/checkout@v2
     - name: build docker
       id: build_docker
       run: |
+        if [ -f EMQX_ENTERPRISE ]; then
+          echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
+          git config --global credential.helper store
+          echo "${{ secrets.CI_GIT_TOKEN }}" >> scripts/git-token
+          make deps-emqx-ee
+        fi
         make docker
         echo "::set-output name=version::$(./pkg-vsn.sh)"
+        if [ -f EMQX_ENTERPRISE ]; then
+          echo "::set-output name=imgname::emqx-ee"
+        else
+          echo "::set-output name=imgname::emqx"
+        fi
     - uses: actions/upload-artifact@v2
       with:
         name: emqx-docker-image-zip
-        path: _packages/emqx/emqx-docker-${{ steps.build_docker.outputs.version }}.zip
+        path: _packages/${{ steps.build_docker.outputs.imgname }}/${{ steps.build_docker.outputs.imgname }}-docker-${{ steps.build_docker.outputs.version }}.zip
 
   webhook:
     runs-on: ubuntu-latest
@@ -43,14 +55,15 @@ jobs:
         path: /tmp
     - name: load docker image
       env:
+        imgname: ${{ needs.build.outputs.imgname}}
         version: ${{ needs.build.outputs.version }}
       run: |
-        unzip -q /tmp/emqx-docker-${version}.zip -d /tmp
-        docker load < /tmp/emqx-docker-${version}
+        unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp
+        docker load < /tmp/${imgname}-docker-${version}
     - name: docker compose up
       timeout-minutes: 5
       env:
-        TARGET: emqx/emqx
+        TARGET: emqx/${{ needs.build.outputs.imgname }}
         EMQX_TAG: ${{ needs.build.outputs.version }}
       run: |
         docker-compose \
@@ -142,14 +155,15 @@ jobs:
         path: /tmp
     - name: load docker image
       env:
+        imgname: ${{ needs.build.outputs.imgname }}
         version: ${{ needs.build.outputs.version }}
       run: |
-        unzip -q /tmp/emqx-docker-${version}.zip -d /tmp
-        docker load < /tmp/emqx-docker-${version}
+        unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp
+        docker load < /tmp/${imgname}-docker-${version}
     - name: docker compose up
       timeout-minutes: 5
       env:
-        TARGET: emqx/emqx
+        TARGET: emqx/${{ needs.build.outputs.imgname }}
         EMQX_TAG: ${{ needs.build.outputs.version }}
         MYSQL_TAG: ${{ matrix.mysql_tag }}
       run: |
@@ -248,14 +262,15 @@ jobs:
         path: /tmp
     - name: load docker image
       env:
+        imgname: ${{ needs.build.outputs.imgname }}
         version: ${{ needs.build.outputs.version }}
       run: |
-        unzip -q /tmp/emqx-docker-${version}.zip -d /tmp
-        docker load < /tmp/emqx-docker-${version}
+        unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp
+        docker load < /tmp/${imgname}-docker-${version}
     - name: docker compose up
       timeout-minutes: 5
       env:
-        TARGET: emqx/emqx
+        TARGET: emqx/${{ needs.build.outputs.imgname }}
         EMQX_TAG: ${{ needs.build.outputs.version }}
         PGSQL_TAG: ${{ matrix.pgsql_tag }}
       run: |
@@ -343,14 +358,15 @@ jobs:
         path: /tmp
     - name: load docker image
       env:
+        imgname: ${{ needs.build.outputs.imgname }}
         version: ${{ needs.build.outputs.version }}
       run: |
-        unzip -q /tmp/emqx-docker-${version}.zip -d /tmp
-        docker load < /tmp/emqx-docker-${version}
+        unzip -q /tmp/${imgname}-docker-${version}.zip -d /tmp
+        docker load < /tmp/${imgname}-docker-${version}
     - name: docker compose up
       timeout-minutes: 5
       env:
-        TARGET: emqx/emqx
+        TARGET: emqx/${{ needs.build.outputs.imgname }}
         EMQX_TAG: ${{ needs.build.outputs.version }}
         MYSQL_TAG: 8
       run: |

+ 1 - 1
apps/emqx_auth_mongo/rebar.config

@@ -1,6 +1,6 @@
 {deps,
   %% NOTE: mind poolboy version when updating mongodb-erlang version
- [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.7"}}},
+ [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}},
   %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
   %% (which has overflow_ttl feature added).
   %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07).

+ 2 - 2
apps/emqx_management/src/emqx_management.appup.src

@@ -1,13 +1,13 @@
 %% -*- mode: erlang -*-
 {VSN,
- [ {<<"4\\.3\\.[0-9]+">>,
+ [ {<<"4\\.3\\.[0-7]+">>,
     [ {apply,{minirest,stop_http,['http:management']}},
       {apply,{minirest,stop_http,['https:management']}},
       {restart_application, emqx_management}
     ]},
    {<<".*">>, []}
  ],
- [ {<<"4\\.3\\.[0-9]+">>,
+ [ {<<"4\\.3\\.[0-7]+">>,
     [ {apply,{minirest,stop_http,['http:management']}},
       {apply,{minirest,stop_http,['https:management']}},
       {restart_application, emqx_management}

+ 2 - 4
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -191,10 +191,8 @@ clients(["show", ClientId]) ->
     if_client(ClientId, fun print/1);
 
 clients(["kick", ClientId]) ->
-    case emqx_cm:kick_session(bin(ClientId)) of
-        ok -> emqx_ctl:print("ok~n");
-        _ -> emqx_ctl:print("Not Found.~n")
-    end;
+    ok = emqx_cm:kick_session(bin(ClientId)),
+    emqx_ctl:print("ok~n");
 
 clients(_) ->
     emqx_ctl:usage([{"clients list",            "List all clients"},

+ 2 - 2
apps/emqx_management/test/emqx_mgmt_SUITE.erl

@@ -158,9 +158,9 @@ t_clients_cmd(_) ->
     timer:sleep(300),
     emqx_mgmt_cli:clients(["list"]),
     ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "client12")),
-    ?assertEqual((emqx_mgmt_cli:clients(["kick", "client12"])), "ok~n"),
+    ?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])),
     timer:sleep(500),
-    ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "Not Found")),
+    ?assertEqual("ok~n", emqx_mgmt_cli:clients(["kick", "client12"])),
     receive
         {'EXIT', T, _} ->
             ok

+ 2 - 2
apps/emqx_management/test/emqx_mgmt_api_SUITE.erl

@@ -223,8 +223,8 @@ t_clients(_) ->
 
     timer:sleep(300),
 
-    {ok, NotFound0} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
-    ?assertEqual(?ERROR12, get(<<"code">>, NotFound0)),
+    {ok, Ok1} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
+    ?assertEqual(?SUCCESS, get(<<"code">>, Ok1)),
 
     {ok, Clients6} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
     ?assertEqual(1, maps:get(<<"count">>, get(<<"meta">>, Clients6))),

+ 1 - 1
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -57,7 +57,7 @@
                          type => string,
                          default => <<"5s">>,
                          title => #{en => <<"Request Timeout">>,
-                                    zh => <<"请求超时时间时间"/utf8>>},
+                                    zh => <<"请求超时时间"/utf8>>},
                          description => #{en => <<"Request Timeout In Seconds">>,
                                           zh => <<"请求超时时间"/utf8>>}},
     pool_size => #{order => 4,

+ 35 - 12
bin/emqx

@@ -20,6 +20,41 @@ mkdir -p "$RUNNER_LOG_DIR"
 # Make sure data directory exists
 mkdir -p "$RUNNER_DATA_DIR"
 
+export ROOTDIR="$RUNNER_ROOT_DIR"
+export ERTS_DIR="$ROOTDIR/erts-$ERTS_VSN"
+export BINDIR="$ERTS_DIR/bin"
+export EMU="beam"
+export PROGNAME="erl"
+DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs"
+ERTS_LIB_DIR="$ERTS_DIR/../lib"
+MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
+
+# Echo to stderr on errors
+echoerr() { echo "$*" 1>&2; }
+
+check_eralng_start() {
+    "$BINDIR/$PROGNAME" -noshell -boot "$REL_DIR/start_clean" -s crypto start -s init stop
+}
+
+if ! check_eralng_start >/dev/null 2>&1; then
+    BUILT_ON="$(head -1 "${REL_DIR}/BUILT_ON")"
+    ## failed to start, might be due to missing libs, try to be portable
+    export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH"
+    if ! check_eralng_start; then
+        ## it's hopeless
+        echoerr "FATAL: Unable to start Erlang (with libcrypto)."
+        echoerr "Please make sure it's running on the correct platform with all required dependencies."
+        echoerr "This EMQ X release is built for $BUILT_ON"
+        exit 1
+    fi
+    echoerr "WARNING: There seem to be missing dynamic libs from the OS. Using libs from ${DYNLIBS_DIR}"
+fi
+
+## backward compatible
+if [ -d "$ERTS_DIR/lib" ]; then
+    export LD_LIBRARY_PATH="$ERTS_DIR/lib:$LD_LIBRARY_PATH"
+fi
+
 # cuttlefish try to read environment variables starting with "EMQX_"
 export CUTTLEFISH_ENV_OVERRIDE_PREFIX='EMQX_'
 
@@ -120,9 +155,6 @@ if [ "$ULIMIT_F" -lt 1024 ]; then
     echo "!!!!"
 fi
 
-# Echo to stderr on errors
-echoerr() { echo "$@" 1>&2; }
-
 # By default, use cuttlefish to generate app.config and vm.args
 CUTTLEFISH="${USE_CUTTLEFISH:-yes}"
 
@@ -364,15 +396,6 @@ else
     PROTO_DIST_ARG="-proto_dist $PROTO_DIST"
 fi
 
-export ROOTDIR="$RUNNER_ROOT_DIR"
-export ERTS_DIR="$ROOTDIR/erts-$ERTS_VSN"
-export BINDIR="$ERTS_DIR/bin"
-export EMU="beam"
-export PROGNAME="erl"
-export LD_LIBRARY_PATH="$ERTS_DIR/lib:$LD_LIBRARY_PATH"
-ERTS_LIB_DIR="$ERTS_DIR/../lib"
-MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
-
 cd "$ROOTDIR"
 
 # User can specify an sname without @hostname

+ 18 - 14
build

@@ -15,18 +15,7 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")"
 PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
 export PKG_VSN
 
-if [ "$(uname -s)" = 'Darwin' ]; then
-	SYSTEM=macos
-elif [ "$(uname -s)" = 'Linux' ]; then
-    if grep -q -i 'centos' /etc/*-release; then
-        DIST='centos'
-        VERSION_ID="$(rpm --eval '%{centos_ver}')"
-    else
-        DIST="$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')"
-        VERSION_ID="$(sed -n '/^VERSION_ID=/p' /etc/os-release | sed -r 's/VERSION_ID=(.*)/\1/g' | sed 's/"//g')"
-    fi
-    SYSTEM="$(echo "${DIST}${VERSION_ID}" | sed -r 's/([a-zA-Z]*)-.*/\1/g')"
-fi
+SYSTEM="$(./scripts/get-distro.sh)"
 
 ARCH="$(uname -m)"
 case "$ARCH" in
@@ -46,8 +35,8 @@ export ARCH
 ## Support RPM and Debian based linux systems
 ##
 if [ "$(uname -s)" = 'Linux' ]; then
-    case "${DIST:-}" in
-        ubuntu|debian|raspbian)
+    case "${SYSTEM:-}" in
+        ubuntu*|debian*|raspbian*)
             PKGERDIR='deb'
             ;;
         *)
@@ -98,6 +87,18 @@ make_relup() {
     ./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}"
 }
 
+cp_dyn_libs() {
+    local rel_dir="$1"
+    local target_dir="${rel_dir}/dynlibs"
+    if ! [ "$(uname -s)" = 'Linux' ]; then
+        return 0;
+    fi
+    mkdir -p "$target_dir"
+    while read -r so_file; do
+        cp -L "$so_file" "$target_dir/"
+    done < <(find "$rel_dir" -type f \( -name "*.so*" -o -name "beam.smp" \) -print0 | xargs -0 ldd | grep -E '^\s+.*=>\s(/lib|/usr)' | awk '{print $3}')
+}
+
 ## make_zip turns .tar.gz into a .zip with a slightly different name.
 ## It assumes the .tar.gz has been built -- relies on Makefile dependency
 make_zip() {
@@ -117,6 +118,9 @@ make_zip() {
     local zipball
     zipball="${pkgpath}/${PROFILE}-${SYSTEM}-${PKG_VSN}-${ARCH}.zip"
     tar zxf "${tarball}" -C "${tard}/emqx"
+    ## try to be portable for zip packages.
+    ## for DEB and RPM packages the dependencies are resoved by yum and apt
+    cp_dyn_libs "${tard}/emqx"
     (cd "${tard}" && zip -qr - emqx) > "${zipball}"
 }
 

+ 1 - 1
deploy/docker/Dockerfile

@@ -41,7 +41,7 @@ LABEL org.label-schema.docker.dockerfile="Dockerfile" \
     org.label-schema.url="https://emqx.io" \
     org.label-schema.vcs-type="Git" \
     org.label-schema.vcs-url="https://github.com/emqx/emqx" \
-    maintainer="Raymond M Mouthaan <raymondmmouthaan@gmail.com>, Huang Rui <vowstar@gmail.com>, EMQ X Team <support@emqx.io>"
+    maintainer="EMQ X Team <support@emqx.io>"
 
 ARG QEMU_ARCH=x86_64
 ARG EMQX_NAME=emqx

+ 12 - 10
docker.mk

@@ -1,8 +1,10 @@
 #!/usr/bin/make -f
 # -*- makefile -*-
 
-## default globals
-TARGET ?= emqx/emqx
+## default globals.
+## when built with `make docker` command the default profile is either emqx or emqx-ee (for enterprise)
+## or the TARGET varialbe can be set beforehand to force a different name
+TARGET ?= emqx/$(PROFILE)
 QEMU_ARCH ?= x86_64
 ARCH ?= amd64
 QEMU_VERSION ?= v5.0.0-2
@@ -37,7 +39,7 @@ docker-prepare:
 	# enable experimental to use docker manifest command
 	@echo '{ "experimental": "enabled" }' | tee $$HOME/.docker/config.json
 	# enable experimental
-	@echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | tee /etc/docker/daemon.json 
+	@echo '{ "experimental": true, "storage-driver": "overlay2", "max-concurrent-downloads": 50, "max-concurrent-uploads": 50 }' | tee /etc/docker/daemon.json
 	@service docker restart
 
 .PHONY: docker-build
@@ -85,7 +87,7 @@ docker-tag:
 
 .PHONY: docker-save
 docker-save:
-	@echo "DOCKER SAVE: Save Docker image." 
+	@echo "DOCKER SAVE: Save Docker image."
 
 	@mkdir -p _packages/$(EMQX_NAME)
 
@@ -94,7 +96,7 @@ docker-save:
 		zip -r -m $(EMQX_NAME)-docker-$(PKG_VSN).zip $(EMQX_NAME)-docker-$(PKG_VSN); \
 		mv ./$(EMQX_NAME)-docker-$(PKG_VSN).zip _packages/$(EMQX_NAME)/$(EMQX_NAME)-docker-$(PKG_VSN).zip; \
 	fi
-	
+
 	@for arch in $(ARCH_LIST); do \
 		if [ -n  "$$(docker images -q  $(TARGET):$(PKG_VSN)-$(OS)-$${arch})" ]; then \
 			docker save  $(TARGET):$(PKG_VSN)-$(OS)-$${arch} > $(EMQX_NAME)-docker-$(PKG_VSN)-$(OS)-$${arch}; \
@@ -105,8 +107,8 @@ docker-save:
 
 .PHONY: docker-push
 docker-push:
-	@echo "DOCKER PUSH: Push Docker image."; 
-	@echo "DOCKER PUSH: pushing - $(TARGET):$(PKG_VSN)."; 
+	@echo "DOCKER PUSH: Push Docker image.";
+	@echo "DOCKER PUSH: pushing - $(TARGET):$(PKG_VSN).";
 
 	@if [ -n "$$(docker images -q $(TARGET):$(PKG_VSN))" ]; then \
 		docker push $(TARGET):$(PKG_VSN); \
@@ -131,7 +133,7 @@ docker-manifest-list:
 		fi; \
 	done; \
 	eval $$version; \
-	eval $$latest; 
+	eval $$latest;
 
 	for arch in $(ARCH_LIST); do \
 		case $${arch} in \
@@ -166,10 +168,10 @@ docker-manifest-list:
 				fi; \
 				;; \
 		esac; \
-	done; 
+	done;
 
 	docker manifest inspect $(TARGET):$(PKG_VSN)
-	docker manifest push $(TARGET):$(PKG_VSN); 
+	docker manifest push $(TARGET):$(PKG_VSN);
 	docker manifest inspect $(TARGET):latest
 	docker manifest push $(TARGET):latest;
 

+ 1 - 1
etc/BUILT_ON

@@ -1 +1 @@
-{{built_on_arch}}
+{{built_on_platform}}

+ 1 - 1
include/emqx_release.hrl

@@ -29,7 +29,7 @@
 
 -ifndef(EMQX_ENTERPRISE).
 
--define(EMQX_RELEASE, {opensource, "4.3.8"}).
+-define(EMQX_RELEASE, {opensource, "4.3.9"}).
 
 -else.
 

+ 4 - 6
lib-ce/emqx_dashboard/src/emqx_dashboard.appup.src

@@ -1,20 +1,18 @@
 %% -*- mode: erlang -*-
 {VSN,
- [ {<<"4.3.[0-9]">>,
+ [ {<<".*">>,
     %% load all plugins
     %% NOTE: this depends on the fact that emqx_dashboard is always
     %% the last application gets upgraded
     [ {apply, {emqx_rule_engine, load_providers, []}}
     , {restart_application, emqx_dashboard}
     , {apply, {emqx_plugins, load, []}}
-    ]},
-   {<<".*">>, []}
+    ]}
  ],
- [ {<<"4.3.[0-9]">>,
+ [ {<<".*">>,
     [ {apply, {emqx_rule_engine, load_providers, []}}
     , {restart_application, emqx_dashboard}
     , {apply, {emqx_plugins, load, []}}
-    ]},
-   {<<".*">>, []}
+    ]}
  ]
 }.

+ 14 - 1
rebar.config.erl

@@ -173,11 +173,24 @@ relx(Vsn, RelType, PkgType) ->
     , {vm_args,false}
     , {release, {emqx, Vsn}, relx_apps(RelType)}
     , {overlay, relx_overlay(RelType)}
-    , {overlay_vars, [ {built_on_arch, rebar_utils:get_arch()}
+    , {overlay_vars, [ {built_on_platform, built_on()}
                      , {emqx_description, emqx_description(RelType, IsEnterprise)}
                      | overlay_vars(RelType, PkgType, IsEnterprise)]}
     ].
 
+built_on() ->
+    On = rebar_utils:get_arch(),
+    case distro() of
+        false -> On;
+        Distro -> On ++ "-" ++ Distro
+    end.
+
+distro() ->
+    case os:type() of
+        {unix, _} -> string:strip(os:cmd("scripts/get-distro.sh"), both, $\n);
+        _ -> false
+    end.
+
 emqx_description(cloud, true) -> "EMQ X Enterprise";
 emqx_description(cloud, false) -> "EMQ X Broker";
 emqx_description(edge, _) -> "EMQ X Edge".

+ 19 - 0
scripts/get-distro.sh

@@ -0,0 +1,19 @@
+#!/bin/bash
+
+## This script prints Linux distro name and its version number
+## e.g. macos, centos8, ubuntu20.04
+
+set -euo pipefail
+
+if [ "$(uname -s)" = 'Darwin' ]; then
+	echo 'macos'
+elif [ "$(uname -s)" = 'Linux' ]; then
+    if grep -q -i 'centos' /etc/*-release; then
+        DIST='centos'
+        VERSION_ID="$(rpm --eval '%{centos_ver}')"
+    else
+        DIST="$(sed -n '/^ID=/p' /etc/os-release | sed -r 's/ID=(.*)/\1/g' | sed 's/"//g')"
+        VERSION_ID="$(sed -n '/^VERSION_ID=/p' /etc/os-release | sed -r 's/VERSION_ID=(.*)/\1/g' | sed 's/"//g')"
+    fi
+    echo "${DIST}${VERSION_ID}" | sed -r 's/([a-zA-Z]*)-.*/\1/g'
+fi

+ 36 - 14
src/emqx.appup.src

@@ -1,21 +1,27 @@
 %% -*- mode: erlang -*-
 {VSN,
   [{"4.3.9",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.8",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.7",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -24,7 +30,9 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.6",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
@@ -34,7 +42,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.5",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -46,7 +55,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.4",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@@ -59,7 +69,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.3",
-    [{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
      {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@@ -137,21 +148,27 @@
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
   [{"4.3.9",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.8",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_frame,brutal_purge,soft_purge,[]},
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.7",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -160,7 +177,9 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.6",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_cm,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -170,7 +189,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.5",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -182,7 +202,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.4",
-    [{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -195,7 +216,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.3",
-    [{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+     {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},

+ 5 - 2
src/emqx_channel.erl

@@ -977,8 +977,11 @@ handle_info({sock_closed, Reason}, Channel =
         Shutdown -> Shutdown
     end;
 
-handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
+handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
+    %% Since sock_closed messages can be generated multiple times,
+    %% we can simply ignore errors of this type in the disconnected state.
+    %% e.g. when the socket send function returns an error, there is already
+    %% a tcp_closed delivered to the process mailbox
     {ok, Channel};
 
 handle_info(clean_acl_cache, Channel) ->

+ 97 - 53
src/emqx_cm.erl

@@ -72,7 +72,7 @@
         ]).
 
 %% Internal export
--export([stats_fun/0]).
+-export([stats_fun/0, clean_down/1]).
 
 -type(chan_pid() :: pid()).
 
@@ -93,7 +93,9 @@
 %% Server name
 -define(CM, ?MODULE).
 
--define(T_TAKEOVER, 15000).
+-define(T_KICK, 5_000).
+-define(T_GET_INFO, 5_000).
+-define(T_TAKEOVER, 15_000).
 
 %% @doc Start the channel manager.
 -spec(start_link() -> startlink_ret()).
@@ -164,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chan_info(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
 
 %% @doc Update infos of the channel.
 -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@@ -189,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chan_stats(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
 
 %% @doc Set channel's stats.
 -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@@ -257,7 +259,7 @@ takeover_session(ClientId) ->
             takeover_session(ClientId, ChanPid);
         ChanPids ->
             [ChanPid|StalePids] = lists:reverse(ChanPids),
-            ?LOG(error, "More than one channel found: ~p", [ChanPids]),
+            ?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
             lists:foreach(fun(StalePid) ->
                                   catch discard_session(ClientId, StalePid)
                           end, StalePids),
@@ -269,77 +271,113 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
         undefined ->
             {error, not_found};
         ConnMod when is_atom(ConnMod) ->
+            %% TODO: if takeover times out, maybe kill the old?
             Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
             {ok, ConnMod, ChanPid, Session}
     end;
-
 takeover_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec(discard_session(emqx_types:clientid()) -> ok).
 discard_session(ClientId) when is_binary(ClientId) ->
     case lookup_channels(ClientId) of
         [] -> ok;
-        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
     end.
 
-do_discard_session(ClientId, Pid) ->
+%% @private Kick a local stale session to force it step down.
+%% If failed to kick (e.g. timeout) force a kill.
+%% Keeping the stale pid around, or returning error or raise an exception
+%% benefits nobody.
+-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
+kick_or_kill(Action, ConnMod, Pid) ->
     try
-        discard_session(ClientId, Pid)
+        %% this is essentailly a gen_server:call implemented in emqx_connection
+        %% and emqx_ws_connection.
+        %% the handle_call is implemented in emqx_channel
+        ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
     catch
         _ : noproc -> % emqx_ws_connection: call
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
         _ : {noproc, _} -> % emqx_connection: gen_server:call
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
-        _ : {'EXIT', {noproc, _}} -> % rpc_call/3
-            ?tp(debug, "session_already_gone", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
+        _ : {shutdown, _} ->
+            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
         _ : {{shutdown, _}, _} ->
-            ?tp(debug, "session_already_shutdown", #{pid => Pid}),
-            ok;
+            ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
+        _ : {timeout, {gen_server, call, _}} ->
+            ?tp(warning, "session_kick_timeout",
+                #{pid => Pid,
+                  action => Action,
+                  stale_channel => stale_channel_info(Pid)
+                 }),
+            ok = force_kill(Pid);
         _ : Error : St ->
-            ?tp(error, "failed_to_discard_session",
-                #{pid => Pid, reason => Error, stacktrace=>St})
+            ?tp(error, "session_kick_exception",
+                #{pid => Pid,
+                  action => Action,
+                  reason => Error,
+                  stacktrace => St,
+                  stale_channel => stale_channel_info(Pid)
+                 }),
+            ok = force_kill(Pid)
     end.
 
-discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
+force_kill(Pid) ->
+    exit(Pid, kill),
+    ok.
+
+stale_channel_info(Pid) ->
+    process_info(Pid, [status, message_queue_len, current_stacktrace]).
+
+discard_session(ClientId, ChanPid) ->
+    kick_session(discard, ClientId, ChanPid).
+
+kick_session(ClientId, ChanPid) ->
+    kick_session(kick, ClientId, ChanPid).
+
+%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
+kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
     case get_chann_conn_mod(ClientId, ChanPid) of
-        undefined -> ok;
+        undefined ->
+            %% already deregistered
+            ok;
         ConnMod when is_atom(ConnMod) ->
-            ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
+            ok = kick_or_kill(Action, ConnMod, ChanPid)
     end;
-
-discard_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
+kick_session(Action, ClientId, ChanPid) ->
+    %% call remote node on the old APIs because we do not know if they have upgraded
+    %% to have kick_session/3
+    Function = case Action of
+                   discard -> discard_session;
+                   kick -> kick_session
+               end,
+    try
+        rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
+    catch
+        Error : Reason ->
+            %% This should mostly be RPC failures.
+            %% However, if the node is still running the old version
+            %% code (prior to emqx app 4.3.10) some of the RPC handler
+            %% exceptions may get propagated to a new version node
+            ?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
+                 [node(ChanPid), Action, Error, Reason])
+    end.
 
 kick_session(ClientId) ->
     case lookup_channels(ClientId) of
-        [] -> {error, not_found};
-        [ChanPid] ->
-            kick_session(ClientId, ChanPid);
+        [] ->
+            ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
+            ok;
         ChanPids ->
-            [ChanPid|StalePids] = lists:reverse(ChanPids),
-            ?LOG(error, "More than one channel found: ~p", [ChanPids]),
-            lists:foreach(fun(StalePid) ->
-                                  catch discard_session(ClientId, StalePid)
-                          end, StalePids),
-            kick_session(ClientId, ChanPid)
+            case length(ChanPids) > 1 of
+                true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
+                false -> ok
+            end,
+            lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
     end.
 
-kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
-    case get_chan_info(ClientId, ChanPid) of
-        #{conninfo := #{conn_mod := ConnMod}} ->
-            ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
-        undefined ->
-            {error, not_found}
-    end;
-
-kick_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
-
 %% @doc Is clean start?
 % is_clean_start(#{clean_start := false}) -> false;
 % is_clean_start(_Attrs) -> true.
@@ -375,10 +413,16 @@ lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
 %% @private
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
-        {badrpc, Reason} -> error(Reason);
-        Res -> Res
+rpc_call(Node, Fun, Args, Timeout) ->
+    case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
+        {badrpc, Reason} ->
+            %% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
+            %% should catch all exceptions and always return 'ok'.
+            %% This leaves 'badrpc' only possible when there is problem
+            %% calling the remote node.
+            error({badrpc, Reason});
+        Res ->
+            Res
     end.
 
 %% @private
@@ -411,7 +455,7 @@ handle_cast(Msg, State) ->
 handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
     ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
     {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
-    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
+    ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
     {noreply, State#{chan_pmon := PMon1}};
 
 handle_info(Info, State) ->
@@ -447,5 +491,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
         error:badarg -> undefined
     end;
 get_chann_conn_mod(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
+    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
 

+ 92 - 37
test/emqx_cm_SUITE.erl

@@ -32,6 +32,12 @@
                      conn_mod => emqx_connection,
                      receive_maximum => 100}}).
 
+-define(WAIT(PATTERN, TIMEOUT, RET),
+        fun() ->
+                receive PATTERN -> RET
+                after TIMEOUT -> error({timeout, ?LINE}) end
+        end()).
+
 %%--------------------------------------------------------------------
 %% CT callbacks
 %%--------------------------------------------------------------------
@@ -180,25 +186,95 @@ t_open_session_race_condition(_) ->
     ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync
     ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
-t_discard_session(_) ->
+t_kick_session_discard_normal(_) ->
+    test_kick_session(discard, normal).
+
+t_kick_session_discard_shutdown(_) ->
+    test_kick_session(discard, shutdown).
+
+t_kick_session_discard_shutdown_with_reason(_) ->
+    test_kick_session(discard, {shutdown, discard}).
+
+t_kick_session_discard_timeout(_) ->
+    test_kick_session(discard, timeout).
+
+t_kick_session_discard_noproc(_) ->
+    test_kick_session(discard, noproc).
+
+t_kick_session_kick_normal(_) ->
+    test_kick_session(discard, normal).
+
+t_kick_session_kick_shutdown(_) ->
+    test_kick_session(discard, shutdown).
+
+t_kick_session_kick_shutdown_with_reason(_) ->
+    test_kick_session(discard, {shutdown, discard}).
+
+t_kick_session_kick_timeout(_) ->
+    test_kick_session(discard, timeout).
+
+t_kick_session_kick_noproc(_) ->
+    test_kick_session(discard, noproc).
+
+test_kick_session(Action, Reason) ->
     ClientId = rand_client_id(),
     #{conninfo := ConnInfo} = ?ChanInfo,
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
+    FakeSessionFun =
+        fun Loop() ->
+                     receive
+                         {'$gen_call', From, A} when A =:= kick orelse
+                                                     A =:= discard ->
+                             case Reason of
+                                 normal ->
+                                     gen_server:reply(From, ok);
+                                 timeout ->
+                                     %% no response to the call
+                                     Loop();
+                                 _ ->
+                                     exit(Reason)
+                             end;
+                         Msg ->
+                             ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
+                             Loop()
+                     end
+        end,
+    {Pid1, _} = spawn_monitor(FakeSessionFun),
+    {Pid2, _} = spawn_monitor(FakeSessionFun),
+    ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
+    ok = emqx_cm:register_channel(ClientId, Pid1, ConnInfo),
+    ok = emqx_cm:register_channel(ClientId, Pid2, ConnInfo),
+    ?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
+    case Reason of
+        noproc -> exit(Pid1, kill), exit(Pid2, kill);
+        _ -> ok
+    end,
+    ok = case Action of
+             kick -> emqx_cm:kick_session(ClientId);
+             discard -> emqx_cm:discard_session(ClientId)
+         end,
+    case Reason =:= timeout orelse Reason =:= noproc of
+        true ->
+            ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
+            ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R));
+        false ->
+            ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),
+            ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R))
+    end,
+    ok = flush_emqx_pool(),
+    ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
-    ok = meck:new(emqx_connection, [passthrough, no_history]),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:unregister_channel(ClientId),
-    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
-    ok = emqx_cm:discard_session(ClientId),
-    ok = emqx_cm:unregister_channel(ClientId),
-    ok = meck:unload(emqx_connection).
+%% Channel deregistration is delegated to emqx_pool as a sync tasks.
+%% The emqx_pool is pool of workers, and there is no way to know
+%% which worker was picked for the last deregistration task.
+%% This help function creates a large enough number of async tasks
+%% to sync with the pool workers.
+%% The number of tasks should be large enough to ensure all workers have
+%% the chance to work on at least one of the tasks.
+flush_emqx_pool() ->
+    Self = self(),
+    L = lists:seq(1, 1000),
+    lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
+    lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
 
 t_discard_session_race(_) ->
     ClientId = rand_client_id(),
@@ -231,27 +307,6 @@ t_takeover_session(_) ->
     {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
     emqx_cm:unregister_channel(<<"clientid">>).
 
-t_kick_session(_) ->
-    Info = #{conninfo := ConnInfo} = ?ChanInfo,
-    ok = meck:new(emqx_connection, [passthrough, no_history]),
-    ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
-    ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
-    {error, not_found} = emqx_cm:kick_session(<<"clientid">>),
-    ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-    ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
-    test = emqx_cm:kick_session(<<"clientid">>),
-    erlang:spawn_link(
-        fun() ->
-            ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-            ok = emqx_cm:insert_channel_info(<<"clientid">>, Info, []),
-
-            timer:sleep(1000)
-        end),
-    ct:sleep(100),
-    test = emqx_cm:kick_session(<<"clientid">>),
-    ok = emqx_cm:unregister_channel(<<"clientid">>),
-    ok = meck:unload(emqx_connection).
-
 t_all_channels(_) ->
     ?assertEqual(true, is_list(emqx_cm:all_channels())).