Przeglądaj źródła

Merge pull request #4943 from emqx/master

Auto-pull-request-on-2021-06-05
Zaiming (Stone) Shi 4 lat temu
rodzic
commit
729e9697cd

+ 3 - 3
.github/workflows/build_packages.yaml

@@ -133,7 +133,7 @@ jobs:
       matrix:
         profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
         erl_otp:
-          - 23.2.7.2
+          - erl23.2.7.2-emqx-2
         exclude:
           - profile: emqx-edge
 
@@ -219,7 +219,7 @@ jobs:
           - centos7
           - centos6
           - raspbian10
-          - raspbian9
+          # - raspbian9
         exclude:
         - os: centos6
           arch: arm64
@@ -268,7 +268,7 @@ jobs:
         if [ $PROFILE = "emqx" ];then
             broker="emqx-ce"
         fi
-        if [[ "$SYSTEM" =~ "raspbian*" ]];then
+        if [ ! -z "$(echo $SYSTEM | grep -oE 'raspbian')" ]; then
             export ARCH="arm"
         fi
 

+ 83 - 55
apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl

@@ -47,6 +47,9 @@
 % FLAG NOT USED
 -define(FNU, 0).
 
+%% erlang:system_time should be unique and random enough
+-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
+                                    integer_to_list(erlang:system_time())])).
 %%--------------------------------------------------------------------
 %% Setups
 %%--------------------------------------------------------------------
@@ -55,6 +58,7 @@ all() ->
     emqx_ct:all(?MODULE).
 
 init_per_suite(Config) ->
+    logger:set_module_level(emqx_sn_gateway, debug),
     emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1),
     Config.
 
@@ -96,7 +100,8 @@ t_connect(_) ->
 
 t_do_2nd_connect(_) ->
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"client_id_test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     timer:sleep(100),
     send_connect_msg(Socket, <<"client_id_other">>),
@@ -116,7 +121,8 @@ t_subscribe(_) ->
     TopicId = ?MAX_PRED_TOPIC_ID + 1,
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"client_id_test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     TopicName1 = <<"abcD">>,
     send_register_msg(Socket, TopicName1, MsgId),
@@ -125,12 +131,12 @@ t_subscribe(_) ->
     ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
                    CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16,
                    MsgId:16, ReturnCode>>, receive_response(Socket)),
-    ?assertEqual([TopicName1], emqx_broker:topics()),
+    ?assert(lists:member(TopicName1, emqx_broker:topics())),
 
     send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId),
     ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
     timer:sleep(100),
-    ?assertEqual([], emqx_broker:topics()),
+    ?assertNot(lists:member(TopicName1, emqx_broker:topics())),
 
     send_disconnect_msg(Socket, undefined),
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@@ -146,7 +152,8 @@ t_subscribe_case01(_) ->
     TopicId = ?MAX_PRED_TOPIC_ID + 1,
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"client_id_test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     TopicName1 = <<"abcD">>,
@@ -176,7 +183,8 @@ t_subscribe_case02(_) ->
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    send_connect_msg(Socket, <<"client_id_test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ?CLIENTID),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     Topic1 = ?PREDEF_TOPIC_NAME1,
@@ -206,7 +214,7 @@ t_subscribe_case03(_) ->
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"ClientA">>,
+    ClientId = ?CLIENTID,
     send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
@@ -234,7 +242,8 @@ t_subscribe_case04(_) ->
     TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"client_id_test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     Topic1 = ?PREDEF_TOPIC_NAME1,
     send_register_msg(Socket, Topic1, MsgId),
@@ -263,7 +272,7 @@ t_subscribe_case05(_) ->
     TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
     ReturnCode = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"testu">>,
+    ClientId = ?CLIENTID,
     send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
@@ -304,8 +313,9 @@ t_subscribe_case06(_) ->
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
     ReturnCode = 0,
+    ClientId = ?CLIENTID,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    send_connect_msg(Socket, ClientId),
 
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_register_msg(Socket, <<"abc">>, MsgId),
@@ -340,7 +350,8 @@ t_subscribe_case07(_) ->
     TopicId1 = ?MAX_PRED_TOPIC_ID + 2,
     TopicId2 = ?MAX_PRED_TOPIC_ID + 3,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
@@ -362,7 +373,8 @@ t_subscribe_case08(_) ->
     MsgId = 1,
     TopicId2 = 2,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId),
@@ -383,7 +395,8 @@ t_publish_negqos_case09(_) ->
     MsgId = 1,
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     Topic = <<"abc">>,
@@ -416,7 +429,8 @@ t_publish_qos0_case01(_) ->
     MsgId = 1,
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     Topic = <<"abc">>,
@@ -447,7 +461,8 @@ t_publish_qos0_case02(_) ->
     MsgId = 1,
     PredefTopicId = ?PREDEF_TOPIC_ID1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@@ -476,7 +491,8 @@ t_publish_qos0_case3(_) ->
     MsgId = 1,
     TopicId = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     Topic = <<"/a/b/c">>,
@@ -506,7 +522,8 @@ t_publish_qos0_case04(_) ->
     MsgId = 1,
     TopicId0 = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId),
@@ -536,7 +553,8 @@ t_publish_qos0_case05(_) ->
     MsgId = 1,
     TopicId0 = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId),
     ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
@@ -556,7 +574,8 @@ t_publish_qos0_case06(_) ->
     MsgId = 1,
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     Topic = <<"abc">>,
@@ -587,7 +606,8 @@ t_publish_qos1_case01(_) ->
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     Topic = <<"abc">>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
     ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
@@ -613,7 +633,8 @@ t_publish_qos1_case02(_) ->
     MsgId = 1,
     PredefTopicId = ?PREDEF_TOPIC_ID1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@@ -633,7 +654,8 @@ t_publish_qos1_case03(_) ->
     MsgId = 1,
     TopicId5 = 5,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>),
     ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)),
@@ -651,7 +673,8 @@ t_publish_qos1_case04(_) ->
     MsgId = 7,
     TopicId0 = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId),
     ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
@@ -677,7 +700,8 @@ t_publish_qos1_case05(_) ->
     MsgId = 7,
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
@@ -702,7 +726,8 @@ t_publish_qos1_case06(_) ->
     MsgId = 7,
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
@@ -728,7 +753,8 @@ t_publish_qos2_case01(_) ->
     TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
     Topic = <<"/abc">>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
     send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
     ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId1:16, MsgId:16,
@@ -755,7 +781,8 @@ t_publish_qos2_case02(_) ->
     MsgId = 7,
     PredefTopicId = ?PREDEF_TOPIC_ID2,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@@ -785,7 +812,8 @@ t_publish_qos2_case03(_) ->
     MsgId = 7,
     TopicId0 = 0,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    send_connect_msg(Socket, <<"test">>),
+    ClientId = ?CLIENTID,
+    send_connect_msg(Socket, ClientId),
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId),
@@ -811,7 +839,7 @@ t_will_case01(_) ->
     WillMsg = <<10, 11, 12, 13, 14>>,
     WillTopic = <<"abc">>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
 
     ok = emqx_broker:subscribe(WillTopic),
 
@@ -846,7 +874,7 @@ t_will_test2(_) ->
     Duration = 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, <<"goodbye">>, QoS),
@@ -870,7 +898,7 @@ t_will_test3(_) ->
     Duration = 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_empty_msg(Socket),
@@ -892,7 +920,7 @@ t_will_test4(_) ->
     Duration = 1,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, <<"abc">>, QoS),
@@ -920,7 +948,7 @@ t_will_test5(_) ->
     Duration = 5,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, <<"abc">>, QoS),
@@ -947,7 +975,7 @@ t_will_case06(_) ->
     WillMsg = <<10, 11, 12, 13, 14>>,
     WillTopic = <<"abc">>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
 
     ok = emqx_broker:subscribe(WillTopic),
 
@@ -983,7 +1011,7 @@ t_asleep_test01_timeout(_) ->
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1007,7 +1035,7 @@ t_asleep_test02_to_awake_and_back(_) ->
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
 
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1022,13 +1050,13 @@ t_asleep_test02_to_awake_and_back(_) ->
     timer:sleep(4500),
 
     % goto awake state and back
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     timer:sleep(4500),
 
     % goto awake state and back
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process
@@ -1045,7 +1073,7 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
     WillPayload = <<10, 11, 12, 13, 14>>,
     MsgId = 1000,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1108,7 +1136,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1149,7 +1177,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
     timer:sleep(300),
 
     % goto awake state, receive downlink messages, and go back to asleep
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
 
     %% 1. get REGISTER first, since this topic has never been registered
     UdpData1 = receive_response(Socket),
@@ -1183,7 +1211,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1228,7 +1256,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
     timer:sleep(50),
 
     % goto awake state, receive downlink messages, and go back to asleep
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
 
     UdpData_reg = receive_response(Socket),
     {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg),
@@ -1261,7 +1289,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1304,7 +1332,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
     timer:sleep(300),
 
     % goto awake state, receive downlink messages, and go back to asleep
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
 
     UdpData = wrap_receive_response(Socket),
     MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
@@ -1323,7 +1351,7 @@ t_asleep_test07_to_connected(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1372,7 +1400,7 @@ t_asleep_test08_to_disconnected(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1403,7 +1431,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1446,7 +1474,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
     ok = emqtt:disconnect(C),
 
     % goto awake state, receive downlink messages, and go back to asleep
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
 
     UdpData_reg = receive_response(Socket),
     {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg),
@@ -1482,7 +1510,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     %% send PINGREQ again to enter awake state
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
     %% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
@@ -1495,7 +1523,7 @@ t_awake_test01_to_connected(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1529,7 +1557,7 @@ t_awake_test02_to_disconnected(_) ->
     WillTopic = <<"dead">>,
     WillPayload = <<10, 11, 12, 13, 14>>,
     {ok, Socket} = gen_udp:open(0, [binary]),
-    ClientId = <<"test">>,
+    ClientId = ?CLIENTID,
     send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
     ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
     send_willtopic_msg(Socket, WillTopic, QoS),
@@ -1545,7 +1573,7 @@ t_awake_test02_to_disconnected(_) ->
     timer:sleep(100),
 
     % goto awake state
-    send_pingreq_msg(Socket, <<"test">>),
+    send_pingreq_msg(Socket, ClientId),
     ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
 
     %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1592,7 +1620,7 @@ send_connect_msg(Socket, ClientId) ->
     ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
 
 send_connect_msg_with_will(Socket, Duration, ClientId) ->
-    Length = 10,
+    Length = 6 + byte_size(ClientId),
     Will = 1,
     CleanSession = 1,
     ProtocolId = 1,
@@ -1601,7 +1629,7 @@ send_connect_msg_with_will(Socket, Duration, ClientId) ->
     ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
 
 send_connect_msg_with_will1(Socket, Duration, ClientId) ->
-    Length = 10,
+    Length = 6 + byte_size(ClientId),
     Will = 1,
     CleanSession = 0,
     ProtocolId = 1,

+ 33 - 24
test/emqx_cm_SUITE.erl

@@ -109,9 +109,13 @@ t_open_session(_) ->
     emqx_cm:unregister_channel(<<"clientid">>),
     ok = meck:unload(emqx_connection).
 
+rand_client_id() ->
+    list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())).
+
 t_open_session_race_condition(_) ->
+    ClientId = rand_client_id(),
     ClientInfo = #{zone => external,
-                   clientid => <<"clientid">>,
+                   clientid => ClientId,
                    username => <<"username">>,
                    peerhost => {127,0,0,1}},
     ConnInfo = #{socktype => tcp,
@@ -136,11 +140,12 @@ t_open_session_race_condition(_) ->
                              exit(Reason)
                      end
                    end,
+    N = 1000,
     [spawn(
       fun() ->
               spawn(OpenASession),
               spawn(OpenASession)
-      end) || _ <- lists:seq(1, 1000)],
+      end) || _ <- lists:seq(1, N)],
 
     WaitingRecv = fun _Wr(N1, N2, 0) ->
                           {N1, N2};
@@ -151,45 +156,49 @@ t_open_session_race_condition(_) ->
                           end
                   end,
 
-    ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
+    {Succeeded, Failed} = WaitingRecv(0, 0, 2 * N),
+    ct:pal("Race condition status: succeeded=~p failed=~p~n", [Succeeded, Failed]),
 
-    ?assertEqual(1, ets:info(emqx_channel, size)),
-    ?assertEqual(1, ets:info(emqx_channel_conn, size)),
-    ?assertEqual(1, ets:info(emqx_channel_registry, size)),
+    ?assertMatch([_], ets:lookup(emqx_channel, ClientId)),
+    [Pid] = emqx_cm:lookup_channels(ClientId),
+    ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Pid})),
+    ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)),
 
-    [Pid] = emqx_cm:lookup_channels(<<"clientid">>),
-    exit(Pid, kill), timer:sleep(100),
-    ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
+    exit(Pid, kill),
+    timer:sleep(100), %% TODO deterministic
+    ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
 
 t_discard_session(_) ->
+    ClientId = rand_client_id(),
     #{conninfo := ConnInfo} = ?ChanInfo,
-    ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
+    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
 
     ok = meck:new(emqx_connection, [passthrough, no_history]),
     ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
     ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
-    ok = emqx_cm:discard_session(<<"clientid">>),
-    ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-    ok = emqx_cm:discard_session(<<"clientid">>),
-    ok = emqx_cm:unregister_channel(<<"clientid">>),
-    ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
-    ok = emqx_cm:discard_session(<<"clientid">>),
+    ok = emqx_cm:discard_session(ClientId),
+    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
+    ok = emqx_cm:discard_session(ClientId),
+    ok = emqx_cm:unregister_channel(ClientId),
+    ok = emqx_cm:register_channel(ClientId, self(), ConnInfo),
+    ok = emqx_cm:discard_session(ClientId),
     ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
     ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
-    ok = emqx_cm:discard_session(<<"clientid">>),
-    ok = emqx_cm:unregister_channel(<<"clientid">>),
+    ok = emqx_cm:discard_session(ClientId),
+    ok = emqx_cm:unregister_channel(ClientId),
     ok = meck:unload(emqx_connection).
 
 t_discard_session_race(_) ->
+    ClientId = rand_client_id(),
     ?check_trace(
        begin
          #{conninfo := ConnInfo0} = ?ChanInfo,
          ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
          {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
-         ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
+         ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo),
          Pid ! stop,
          receive {'DOWN', Ref, process, Pid, normal} -> ok end,
-         ok = emqx_cm:discard_session(<<"clientid">>),
+         ok = emqx_cm:discard_session(ClientId),
          {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
        end,
        fun(_, _) ->
@@ -235,10 +244,10 @@ t_all_channels(_) ->
     ?assertEqual(true, is_list(emqx_cm:all_channels())).
 
 t_lock_clientid(_) ->
-    {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
-    {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
-    {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>),
-    {true, _Nodes} = emqx_cm_locker:unlock(<<"clientid">>).
+    {true, Nodes} = emqx_cm_locker:lock(<<"clientid">>),
+    ?assertEqual({true, Nodes}, emqx_cm_locker:lock(<<"clientid">>)),
+    ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)),
+    ?assertEqual({true, Nodes}, emqx_cm_locker:unlock(<<"clientid">>)).
 
 t_message(_) ->
     ?CM ! testing,