Просмотр исходного кода

Merge pull request #4768 from emqx/resolve-conflict-to-5.0

Auto-pull-request-on-2021-05-08
Zaiming (Stone) Shi 4 лет назад
Родитель
Сommit
70d2e0e905
38 измененных файлов с 266 добавлено и 128 удалено
  1. 2 2
      .ci/fvt_tests/relup.lux
  2. 6 0
      .github/workflows/build_packages.yaml
  3. 4 1
      .github/workflows/build_slim_packages.yaml
  4. 1 1
      .github/workflows/run_test_cases.yaml
  5. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
  6. 6 0
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src
  7. 6 6
      apps/emqx_coap/test/emqx_coap_SUITE.erl
  8. 9 9
      apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl
  9. 3 3
      apps/emqx_exhook/priv/protos/exhook.proto
  10. 1 1
      apps/emqx_exhook/src/emqx_exhook.app.src
  11. 15 0
      apps/emqx_exhook/src/emqx_exhook.appup.src
  12. 7 4
      apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl
  13. 1 1
      apps/emqx_management/src/emqx_management.app.src
  14. 14 4
      apps/emqx_management/src/emqx_management.appup.src
  15. 1 1
      apps/emqx_management/src/emqx_mgmt.erl
  16. 6 0
      lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl
  17. 1 1
      lib-ce/emqx_modules/src/emqx_modules.app.src
  18. 10 2
      lib-ce/emqx_modules/src/emqx_modules.appup.src
  19. 1 1
      lib-ce/emqx_telemetry/src/emqx_telemetry.app.src
  20. 15 0
      lib-ce/emqx_telemetry/src/emqx_telemetry.appup.src
  21. 2 0
      pkg-vsn.sh
  22. 1 1
      priv/emqx.schema
  23. 5 3
      rebar.config
  24. 12 4
      rebar.config.erl
  25. 2 1
      scripts/apps-version-check.sh
  26. 23 15
      scripts/get-dashboard.sh
  27. 1 1
      src/emqx.app.src
  28. 24 6
      src/emqx.appup.src
  29. 5 1
      src/emqx_connection.erl
  30. 8 21
      src/emqx_frame.erl
  31. 9 10
      src/emqx_http_lib.erl
  32. 1 7
      src/emqx_logger_textfmt.erl
  33. 1 1
      src/emqx_node_dump.erl
  34. 12 5
      src/emqx_plugins.erl
  35. 6 3
      src/emqx_ws_connection.erl
  36. 14 10
      test/emqx_cm_SUITE.erl
  37. 20 1
      test/emqx_frame_SUITE.erl
  38. 10 0
      test/emqx_http_lib_tests.erl

+ 2 - 2
.ci/fvt_tests/relup.lux

@@ -72,7 +72,7 @@
 
 [shell bench]
     !cd $BENCH_PATH
-    !./emqtt_bench pub -c 10 -I 1000 -t t/%i -s 64 -L 600
+    !./emqtt_bench pub -c 10 -I 1000 -t t/%i -s 64 -L 300
     ???sent
 
 [shell emqx]
@@ -109,7 +109,7 @@
     ???publish complete
     ??SH-PROMPT:
     !curl http://127.0.0.1:8080/counter
-    ???{"data":600,"code":0}
+    ???{"data":300,"code":0}
     ?SH-PROMPT
 
 [shell emqx2]

+ 6 - 0
.github/workflows/build_packages.yaml

@@ -95,6 +95,10 @@ jobs:
         if (Test-Path rebar.lock) {
             Remove-Item -Force -Path rebar.lock
         }
+        make ensure-rebar3
+        copy rebar3 "${{ steps.install_erlang.outputs.erlpath }}\bin"
+        ls "${{ steps.install_erlang.outputs.erlpath }}\bin"
+        rebar3 --help
         make ${{ matrix.profile }}
         mkdir -p _packages/${{ matrix.profile }}
         Compress-Archive -Path _build/${{ matrix.profile }}/rel/emqx -DestinationPath _build/${{ matrix.profile }}/rel/$pkg_name
@@ -155,6 +159,8 @@ jobs:
     - name: build
       run: |
         . $HOME/.kerl/${{ matrix.erl_otp }}/activate
+        make -C source ensure-rebar3
+        sudo cp source/rebar3 /usr/local/bin/rebar3
         make -C source ${{ matrix.profile }}-zip
     - name: test
       run: |

+ 4 - 1
.github/workflows/build_slim_packages.yaml

@@ -53,7 +53,7 @@ jobs:
     strategy:
       matrix:
         erl_otp:
-        - 23.2.7.2
+        - 23.2.7.2-emqx-2
 
     steps:
     - uses: actions/checkout@v1
@@ -82,11 +82,14 @@ jobs:
       if: steps.cache.outputs.cache-hit != 'true'
       timeout-minutes: 60
       run: |
+        export OTP_GITHUB_URL="https://github.com/emqx/otp"
         kerl build ${{ matrix.erl_otp }}
         kerl install ${{ matrix.erl_otp }} $HOME/.kerl/${{ matrix.erl_otp }}
     - name: build
       run: |
         . $HOME/.kerl/${{ matrix.erl_otp }}/activate
+        make ensure-rebar3
+        sudo cp rebar3 /usr/local/bin/rebar3
         make ${EMQX_NAME}-zip
     - name: test
       run: |

+ 1 - 1
.github/workflows/run_test_cases.yaml

@@ -130,7 +130,7 @@ jobs:
             docker exec --env-file .env -i erlang bash -c "make coveralls"
         - name: cat rebar.crashdump
           if: failure()
-          run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump' fi
+          run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
         - uses: actions/upload-artifact@v1
           if: failure()
           with:

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mqtt,
  [{description, "EMQ X Bridge to MQTT Broker"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,replayq,emqtt]},

+ 6 - 0
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src

@@ -2,9 +2,15 @@
 
 {VSN,
   [
+    {"4.3.0", [
+     {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
+   ]},
     {<<".*">>, []}
   ],
   [
+    {"4.3.0", [
+     {load_module, emqx_bridge_worker, brutal_purge, soft_purge, []}
+    ]},
     {<<".*">>, []}
   ]
 }.

+ 6 - 6
apps/emqx_coap/test/emqx_coap_SUITE.erl

@@ -120,7 +120,7 @@ t_observe_acl_deny(_Config) ->
     ok = meck:unload(emqx_access_control).
 
 t_observe_wildcard(_Config) ->
-    Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)),
+    Topic = <<"+/b">>, TopicStr = emqx_http_lib:uri_encode(binary_to_list(Topic)),
     Payload = <<"123">>,
     Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
     {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
@@ -143,7 +143,7 @@ t_observe_wildcard(_Config) ->
     [] = emqx:subscribers(Topic).
 
 t_observe_pub(_Config) ->
-    Topic = <<"+/b">>, TopicStr = http_uri:encode(binary_to_list(Topic)),
+    Topic = <<"+/b">>, TopicStr = emqx_http_lib:uri_encode(binary_to_list(Topic)),
     Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
     {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
     ?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
@@ -152,7 +152,7 @@ t_observe_pub(_Config) ->
     ?assert(is_pid(SubPid)),
 
     Topic2 = <<"a/b">>, Payload2 = <<"UFO">>,
-    TopicStr2 = http_uri:encode(binary_to_list(Topic2)),
+    TopicStr2 = emqx_http_lib:uri_encode(binary_to_list(Topic2)),
     URI2 = "coap://127.0.0.1/mqtt/"++TopicStr2++"?c=client1&u=tom&p=secret",
 
     Reply2 = er_coap_client:request(put, URI2, #coap_content{format = <<"application/octet-stream">>, payload = Payload2}),
@@ -164,7 +164,7 @@ t_observe_pub(_Config) ->
     ?assertEqual(Payload2, PayloadRecv2),
 
     Topic3 = <<"j/b">>, Payload3 = <<"ET629">>,
-    TopicStr3 = http_uri:encode(binary_to_list(Topic3)),
+    TopicStr3 = emqx_http_lib:uri_encode(binary_to_list(Topic3)),
     URI3 = "coap://127.0.0.1/mqtt/"++TopicStr3++"?c=client2&u=mike&p=guess",
     Reply3 = er_coap_client:request(put, URI3, #coap_content{format = <<"application/octet-stream">>, payload = Payload3}),
     {ok,changed, _} = Reply3,
@@ -186,7 +186,7 @@ t_one_clientid_sub_2_topics(_Config) ->
     [SubPid] = emqx:subscribers(Topic1),
     ?assert(is_pid(SubPid)),
 
-    Topic2 = <<"x/y">>, TopicStr2 = http_uri:encode(binary_to_list(Topic2)),
+    Topic2 = <<"x/y">>, TopicStr2 = emqx_http_lib:uri_encode(binary_to_list(Topic2)),
     Payload2 = <<"456">>,
     Uri2 = "coap://127.0.0.1/mqtt/"++TopicStr2++"?c=client1&u=tom&p=secret",
     {ok, Pid2, N2, Code2, Content2} = er_coap_observer:observe(Uri2),
@@ -217,7 +217,7 @@ t_invalid_parameter(_Config) ->
     %% "cid=client2" is invaid
     %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
     Topic3 = <<"a/b">>, Payload3 = <<"ET629">>,
-    TopicStr3 = http_uri:encode(binary_to_list(Topic3)),
+    TopicStr3 = emqx_http_lib:uri_encode(binary_to_list(Topic3)),
     URI3 = "coap://127.0.0.1/mqtt/"++TopicStr3++"?cid=client2&u=tom&p=simple",
     Reply3 = er_coap_client:request(put, URI3, #coap_content{format = <<"application/octet-stream">>, payload = Payload3}),
     ?assertMatch({error,bad_request}, Reply3),

+ 9 - 9
apps/emqx_coap/test/emqx_coap_pubsub_SUITE.erl

@@ -173,7 +173,7 @@ t_case01_publish_post(_Config) ->
     ?assertEqual(<<"42">>, CT2),
 
     %% post to publish message to topic maintopic/topic1
-    FullTopicStr = http_uri:encode(binary_to_list(FullTopic)),
+    FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)),
     URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret",
     PubPayload = <<"PUBLISH">>,
 
@@ -286,7 +286,7 @@ t_case01_publish_put(_Config) ->
     ?assertEqual(<<"42">>, CT2),
 
     %% put to publish message to topic maintopic/topic1
-    FullTopicStr = http_uri:encode(binary_to_list(FullTopic)),
+    FullTopicStr = emqx_http_lib:uri_encode(binary_to_list(FullTopic)),
     URI2 = "coap://127.0.0.1/ps/"++FullTopicStr++"?c=client1&u=tom&p=secret",
     PubPayload = <<"PUBLISH">>,
 
@@ -430,7 +430,7 @@ t_case01_subscribe(_Config) ->
 t_case02_subscribe(_Config) ->
     Topic = <<"a/b">>,
     TopicStr = binary_to_list(Topic),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
     Payload = <<"payload">>,
 
     %% post to publish a new topic "a/b", and the topic is created
@@ -477,7 +477,7 @@ t_case03_subscribe(_Config) ->
     %% Subscribe to the unexisted topic "a/b", got not_found
     Topic = <<"a/b">>,
     TopicStr = binary_to_list(Topic),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
     Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
     {error, not_found} = er_coap_observer:observe(Uri),
 
@@ -487,7 +487,7 @@ t_case04_subscribe(_Config) ->
     %% Subscribe to the wildcad topic "+/b", got bad_request
     Topic = <<"+/b">>,
     TopicStr = binary_to_list(Topic),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
     Uri = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",
     {error, bad_request} = er_coap_observer:observe(Uri),
 
@@ -582,7 +582,7 @@ t_case04_read(_Config) ->
 t_case05_read(_Config) ->
     Topic = <<"a/b">>,
     TopicStr = binary_to_list(Topic),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
     Payload = <<"payload">>,
 
     %% post to publish a new topic "a/b", and the topic is created
@@ -609,7 +609,7 @@ t_case05_read(_Config) ->
 t_case01_delete(_Config) ->
     TopicInPayload = <<"a/b">>,
     TopicStr = binary_to_list(TopicInPayload),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
     Payload = list_to_binary("<"++PercentEncodedTopic++">;ct=42"),
     URI = "coap://127.0.0.1/ps/"++"?c=client1&u=tom&p=secret",
 
@@ -621,7 +621,7 @@ t_case01_delete(_Config) ->
 
     %% Client post to CREATE topic "a/b/c"
     TopicInPayload1 = <<"a/b/c">>,
-    PercentEncodedTopic1 = http_uri:encode(binary_to_list(TopicInPayload1)),
+    PercentEncodedTopic1 = emqx_http_lib:uri_encode(binary_to_list(TopicInPayload1)),
     Payload1 = list_to_binary("<"++PercentEncodedTopic1++">;ct=42"),
     Reply1 = er_coap_client:request(post, URI, #coap_content{format = <<"application/link-format">>, payload = Payload1}),
     ?LOGT("Reply =~p", [Reply1]),
@@ -643,7 +643,7 @@ t_case01_delete(_Config) ->
 t_case02_delete(_Config) ->
     TopicInPayload = <<"a/b">>,
     TopicStr = binary_to_list(TopicInPayload),
-    PercentEncodedTopic = http_uri:encode(TopicStr),
+    PercentEncodedTopic = emqx_http_lib:uri_encode(TopicStr),
 
     %% DELETE the unexisted topic "a/b"
     Uri1 = "coap://127.0.0.1/ps/"++PercentEncodedTopic++"?c=client1&u=tom&p=secret",

+ 3 - 3
apps/emqx_exhook/priv/protos/exhook.proto

@@ -237,14 +237,14 @@ message EmptySuccess { }
 message ValuedResponse {
 
   // The responsed value type
-  //  - ignore: Ignore the responsed value
   //  - contiune: Use the responsed value and execute the next hook
+  //  - ignore: Ignore the responsed value
   //  - stop_and_return: Use the responsed value and stop the chain executing
   enum ResponsedType {
 
-    IGNORE = 0;
+    CONTINUE = 0;
 
-    CONTINUE = 1;
+    IGNORE = 1;
 
     STOP_AND_RETURN = 2;
   }

+ 1 - 1
apps/emqx_exhook/src/emqx_exhook.app.src

@@ -1,6 +1,6 @@
 {application, emqx_exhook,
  [{description, "EMQ X Extension for Hook"},
-  {vsn, "4.3.0"},
+  {vsn, "4.3.1"},
   {modules, []},
   {registered, []},
   {mod, {emqx_exhook_app, []}},

+ 15 - 0
apps/emqx_exhook/src/emqx_exhook.appup.src

@@ -0,0 +1,15 @@
+%% -*-: erlang -*-
+{VSN,
+ [
+    {"4.3.0", [
+      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
+    ]},
+    {<<".*">>, []}
+ ],
+ [
+    {"4.3.0", [
+      {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}
+    ]},
+    {<<".*">>, []}
+ ]
+}.

+ 7 - 4
apps/emqx_lwm2m/test/emqx_lwm2m_SUITE.erl

@@ -1886,8 +1886,11 @@ std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic) ->
     timer:sleep(100).
 
 resolve_uri(Uri) ->
-    {ok, {Scheme, _UserInfo, Host, PortNo, Path, Query}} =
-        http_uri:parse(Uri, [{scheme_defaults, [{coap, ?DEFAULT_COAP_PORT}, {coaps, ?DEFAULT_COAPS_PORT}]}]),
+    {ok, #{scheme := Scheme,
+           host := Host,
+           port := PortNo,
+           path := Path} = URIMap} = emqx_http_lib:uri_parse(Uri),
+    Query = maps:get(query, URIMap, ""),
     {ok, PeerIP} = inet:getaddr(Host, inet),
     {Scheme, {PeerIP, PortNo}, split_path(Path), split_query(Query)}.
 
@@ -1896,7 +1899,7 @@ split_path([$/]) -> [];
 split_path([$/ | Path]) -> split_segments(Path, $/, []).
 
 split_query([]) -> [];
-split_query([$? | Path]) -> split_segments(Path, $&, []).
+split_query(Path) -> split_segments(Path, $&, []).
 
 split_segments(Path, Char, Acc) ->
     case string:rchr(Path, Char) of
@@ -1908,7 +1911,7 @@ split_segments(Path, Char, Acc) ->
     end.
 
 make_segment(Seg) ->
-    list_to_binary(http_uri:decode(Seg)).
+    list_to_binary(emqx_http_lib:uri_decode(Seg)).
 
 
 get_coap_path(Options) ->

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -1,6 +1,6 @@
 {application, emqx_management,
  [{description, "EMQ X Management API and CLI"},
-  {vsn, "4.3.2"}, % strict semver, bump manually!
+  {vsn, "4.3.3"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_management_sup]},
   {applications, [kernel,stdlib,minirest]},

+ 14 - 4
apps/emqx_management/src/emqx_management.appup.src

@@ -1,14 +1,24 @@
 %% -*-: erlang -*-
-{"4.3.2",
- [ {<<"4.3.[0-1]">>,
+{VSN,
+ [ {"4.3.2",
+    [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+    ]},
+   {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
     , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
-    ]}
+    , {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+    ]},
+   {<<".*">>, []}
  ],
  [
+   {"4.3.2",
+    [ {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+    ]},
    {<<"4.3.[0-1]">>,
     [ {load_module, emqx_mgmt_data_backup, brutal_purge, soft_purge, []}
     , {load_module, emqx_mgmt_cli, brutal_purge, soft_purge, []}
-    ]}
+    , {load_module, emqx_mgmt, brutal_purge, soft_purge, []}
+    ]},
+   {<<".*">>, []}
  ]
 }.

+ 1 - 1
apps/emqx_management/src/emqx_mgmt.erl

@@ -139,7 +139,7 @@ node_info(Node) when Node =:= node() ->
     Info#{node              => node(),
           otp_release       => iolist_to_binary(otp_rel()),
           memory_total      => proplists:get_value(allocated, Memory),
-          memory_used       => proplists:get_value(used, Memory),
+          memory_used       => proplists:get_value(total, Memory),
           process_available => erlang:system_info(process_limit),
           process_used      => erlang:system_info(process_count),
           max_fds           => proplists:get_value(max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),

+ 6 - 0
lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl

@@ -53,6 +53,12 @@
         , unregister/2
         ]).
 
+-export([ get_topic_metrics/2
+        , register_topic_metrics/2
+        , unregister_topic_metrics/2
+        , unregister_all_topic_metrics/1
+        ]).
+
 list(#{topic := Topic0}, _Params) ->
     execute_when_enabled(fun() ->
         Topic = emqx_mgmt_util:urldecode(Topic0),

+ 1 - 1
lib-ce/emqx_modules/src/emqx_modules.app.src

@@ -1,6 +1,6 @@
 {application, emqx_modules,
  [{description, "EMQ X Module Management"},
-  {vsn, "4.3.1"},
+  {vsn, "4.3.2"},
   {modules, []},
   {applications, [kernel,stdlib]},
   {mod, {emqx_modules_app, []}},

+ 10 - 2
lib-ce/emqx_modules/src/emqx_modules.appup.src

@@ -1,14 +1,22 @@
 %% -*-: erlang -*-
 {VSN,
   [
+    {"4.3.1", [
+      {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
+    ]},
     {"4.3.0", [
-      {update, emqx_mod_delayed, {advanced, []}}
+      {update, emqx_mod_delayed, {advanced, []}},
+      {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
     ]},
     {<<".*">>, []}
   ],
   [
+    {"4.3.1", [
+      {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
+    ]},
     {"4.3.0", [
-      {update, emqx_mod_delayed, {advanced, []}}
+      {update, emqx_mod_delayed, {advanced, []}},
+      {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
     ]},
     {<<".*">>, []}
   ]

+ 1 - 1
lib-ce/emqx_telemetry/src/emqx_telemetry.app.src

@@ -1,6 +1,6 @@
 {application, emqx_telemetry,
  [{description, "EMQ X Telemetry"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_telemetry_sup]},
   {applications, [kernel,stdlib]},

+ 15 - 0
lib-ce/emqx_telemetry/src/emqx_telemetry.appup.src

@@ -0,0 +1,15 @@
+%% -*- mode: erlang -*-
+{VSN,
+ [
+   {"4.3.0", [
+     {load_module, emqx_telemetry, brutal_purge, soft_purge, []}
+   ]},
+   {<<".*">>, []}
+ ],
+ [
+   {"4.3.0", [
+     {load_module, emqx_telemetry, brutal_purge, soft_purge, []}
+   ]},
+   {<<".*">>, []}
+ ]
+}.

+ 2 - 0
pkg-vsn.sh

@@ -12,8 +12,10 @@ else
     EDITION='opensource'
 fi
 
+## emqx_release.hrl is the single source of truth for release version
 RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)"
 
+## git commit hash is added as suffix in case the git tag and release version is not an exact match
 if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then
     SUFFIX="-$(git rev-parse HEAD | cut -b1-8)"
 fi

+ 1 - 1
priv/emqx.schema

@@ -2301,7 +2301,7 @@ end}.
 
 {mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [
   {default, quorum},
-  {datatype, {enum, [local,one,quorum,all]}}
+  {datatype, {enum, [local,leader,quorum,all]}}
 ]}.
 
 %% @doc Shared Subscription Dispatch Strategy.

+ 5 - 3
rebar.config

@@ -8,7 +8,8 @@
 
 {edoc_opts, [{preprocess,true}]}.
 {erl_opts, [warn_unused_vars,warn_shadow_vars,warn_unused_import,
-            warn_obsolete_guard,compressed]}.
+            warn_obsolete_guard,compressed,
+            {d, snk_kind, msg}]}.
 
 {extra_src_dirs, [{"etc", [{recursive,true}]}]}.
 
@@ -35,7 +36,8 @@
 {erl_first_files, ["src/emqx_logger.erl", "src/emqx_rule_actions_trans.erl"]}.
 
 {deps,
-    [ {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}}
+    [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
+    , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.5"}}}
     , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
     , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
@@ -53,7 +55,7 @@
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
     , {getopt, "1.0.1"}
-    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.12.0"}}}
+    , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
     ]}.
 
 {xref_ignores,

+ 12 - 4
rebar.config.erl

@@ -2,10 +2,15 @@
 
 -export([do/2]).
 
-do(_Dir, CONFIG) ->
-    {HasElixir, C1} = deps(CONFIG),
-    Config = dialyzer(C1),
-    maybe_dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config(HasElixir)).
+do(Dir, CONFIG) ->
+    case iolist_to_binary(Dir) of
+        <<".">> ->
+            {HasElixir, C1} = deps(CONFIG),
+            Config = dialyzer(C1),
+            maybe_dump(Config ++ [{overrides, overrides()}] ++ coveralls() ++ config(HasElixir));
+        _ ->
+            CONFIG
+    end.
 
 bcrypt() ->
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}.
@@ -46,6 +51,8 @@ overrides() ->
     [ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]}
             , {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]}
             ]}
+    , {add, snabbkaffe,
+       [{erl_opts, common_compile_opts()}]}
     ] ++ community_plugin_overrides().
 
 community_plugin_overrides() ->
@@ -106,6 +113,7 @@ test_deps() ->
 common_compile_opts() ->
     [ debug_info % alwyas include debug_info
     , {compile_info, [{emqx_vsn, get_vsn()}]}
+    , {d, snk_kind, msg}
     ] ++
     [{d, 'EMQX_ENTERPRISE'} || is_enterprise()] ++
     [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1" ].

+ 2 - 1
scripts/apps-version-check.sh

@@ -1,7 +1,8 @@
 #!/bin/bash
 set -euo pipefail
 
-latest_release=$(git describe --tags "$(git rev-list --tags --max-count=1 --remotes=refs/remote/origin)")
+remote="refs/remote/$(git remote -v | grep fetch | grep 'emqx/emqx' | awk '{print $1}')"
+latest_release=$(git describe --tags "$(git rev-list --tags --max-count=1 --remotes="$remote")")
 
 bad_app_count=0
 

+ 23 - 15
scripts/get-dashboard.sh

@@ -12,11 +12,14 @@ if [ -f 'EMQX_ENTERPRISE' ]; then
     DASHBOARD_PATH='lib-ee/emqx_dashboard/priv'
     DASHBOARD_REPO='emqx-enterprise-dashboard-frontend-src'
     AUTH="Authorization: token $(cat scripts/git-token)"
+    # have to be resolved with auth and redirect
+    DIRECT_DOWNLOAD_URL=""
 else
     VERSION="${EMQX_CE_DASHBOARD_VERSION}"
     DASHBOARD_PATH='lib-ce/emqx_dashboard/priv'
     DASHBOARD_REPO='emqx-dashboard-frontend'
     AUTH=""
+    DIRECT_DOWNLOAD_URL="https://github.com/emqx/${DASHBOARD_REPO}/releases/download/${VERSION}/emqx-dashboard.zip"
 fi
 
 case $(uname) in
@@ -32,27 +35,32 @@ if [ -d "$DASHBOARD_PATH/www" ] && [ "$(version)" = "$VERSION" ]; then
     exit 0
 fi
 
-get_assets(){
+find_url() {
     # Get the download URL of our desired asset
-    download_url="$(curl --silent --show-error \
-        --header "${AUTH}" \
-        --header "Accept: application/vnd.github.v3+json" \
-        "https://api.github.com/repos/emqx/${DASHBOARD_REPO}/releases/tags/${VERSION}" \
-        | jq --raw-output ".assets[] | select(.name==\"${RELEASE_ASSET_FILE}\").url" \
-        | tr -d '\n' | tr -d '\r')"
+    release_url="https://api.github.com/repos/emqx/${DASHBOARD_REPO}/releases/tags/${VERSION}"
+    release_info="$(curl --silent --show-error --header "${AUTH}" --header "Accept: application/vnd.github.v3+json" "$release_url")"
+    if ! download_url="$(echo "$release_info" | jq --raw-output ".assets[] | select(.name==\"${RELEASE_ASSET_FILE}\").url" | tr -d '\n' | tr -d '\r')"; then
+        echo "failed to query $release_url"
+        echo "${release_info}"
+        exit 1
+    fi
     # Get GitHub's S3 redirect URL
-    redirect_url=$(curl --silent --show-error \
-        --header "${AUTH}" \
-        --header "Accept: application/octet-stream" \
-        --write-out "%{redirect_url}" \
-        "$download_url")
     curl --silent --show-error \
+         --header "${AUTH}" \
          --header "Accept: application/octet-stream" \
-         --output "${RELEASE_ASSET_FILE}" \
-         "$redirect_url"
+         --write-out "%{redirect_url}" \
+         "$download_url"
 }
 
-get_assets
+if [ -z "$DIRECT_DOWNLOAD_URL" ]; then
+    DIRECT_DOWNLOAD_URL="$(find_url)"
+fi
+
+curl -L --silent --show-error \
+     --header "Accept: application/octet-stream" \
+     --output "${RELEASE_ASSET_FILE}" \
+     "$DIRECT_DOWNLOAD_URL"
+
 unzip -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH"
 rm -rf "$DASHBOARD_PATH/www"
 mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www"

+ 1 - 1
src/emqx.app.src

@@ -1,7 +1,7 @@
 {application, emqx,
  [{id, "emqx"},
   {description, "EMQ X"},
-  {vsn, "4.3.2"}, % strict semver, bump manually!
+  {vsn, "4.3.3"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

+ 24 - 6
src/emqx.appup.src

@@ -1,17 +1,25 @@
-%% -*-: erlang -*-
+%% -*- mode: erlang -*-
 {VSN,
  [
+   {"4.3.2", [
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
+   ]},
    {"4.3.1", [
+     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
      {load_module, emqx_channel, brutal_purge, soft_purge, []},
      {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []}
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
+     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
    ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
@@ -21,24 +29,33 @@
      {load_module, emqx_channel, brutal_purge, soft_purge, []},
      {load_module, emqx_app, brutal_purge, soft_purge, []},
      {load_module, emqx_plugins, brutal_purge, soft_purge, []},
-     %%
+     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
+     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}},
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
    ]},
    {<<".*">>, []}
  ],
  [
+   {"4.3.2", [
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
+   ]},
    {"4.3.1", [
+     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
+     {load_module, emqx_frame, brutal_purge, soft_purge, []},
      {load_module, emqx_cm, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_node_dump, brutal_purge, soft_purge, []},
      {load_module, emqx_channel, brutal_purge, soft_purge, []},
      {load_module, emqx_app, brutal_purge, soft_purge, []},
-     {load_module, emqx_plugins, brutal_purge, soft_purge, []}
+     {load_module, emqx_plugins, brutal_purge, soft_purge, []},
+     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
    ]},
    {"4.3.0", [
      {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []},
+     {load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_connection, brutal_purge, soft_purge, []},
      {load_module, emqx_congestion, brutal_purge, soft_purge, []},
      {load_module, emqx_frame, brutal_purge, soft_purge, []},
@@ -48,10 +65,11 @@
      {load_module, emqx_channel, brutal_purge, soft_purge, []},
      {load_module, emqx_app, brutal_purge, soft_purge, []},
      {load_module, emqx_plugins, brutal_purge, soft_purge, []},
+     {load_module, emqx_logger_textfmt, brutal_purge, soft_purge, []},
      %% Just load the module. We don't need to change the 'messages.retained'
      %% and 'messages.retained' counter type.
      {load_module, emqx_metrics, brutal_purge, soft_purge, []},
-     {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}}
+     {load_module, emqx_http_lib, brutal_purge, soft_purge, []}
    ]},
    {<<".*">>, []}
  ]

+ 5 - 1
src/emqx_connection.erl

@@ -41,7 +41,8 @@
         , stats/1
         ]).
 
--export([ async_set_keepalive/4
+-export([ async_set_keepalive/3
+        , async_set_keepalive/4
         , async_set_socket_options/2
         ]).
 
@@ -200,6 +201,9 @@ stats(#state{transport = Transport,
 %%
 %% NOTE: This API sets TCP socket options, which has nothing to do with
 %%       the MQTT layer's keepalive (PINGREQ and PINGRESP).
+async_set_keepalive(Idle, Interval, Probes) ->
+    async_set_keepalive(self(), Idle, Interval, Probes).
+
 async_set_keepalive(Pid, Idle, Interval, Probes) ->
     Options = [ {keepalive, true}
               , {raw, 6, 4, <<Idle:32/native>>}

+ 8 - 21
src/emqx_frame.erl

@@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header,
                      len := Length,
                      rest := Body}
              }, Options}) when is_binary(Bin) ->
-    BodyBytes = body_bytes(Body),
-    {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin),
-    NewBody = append_body(Body, NewBodyPart),
-    parse_frame(NewBody, Tail, Header, Length, Options).
-
-%% split given binary with the first N bytes
-split(N, Bin) when N =< 0 ->
-    {Bin, <<>>};
-split(N, Bin) when N =< size(Bin) ->
-    <<H:N/binary, T/binary>> = Bin,
-    {H, T}.
+    NewBody = append_body(Body, Bin),
+    parse_frame(NewBody, Header, Length, Options).
 
 parse_remaining_len(<<>>, Header, Options) ->
     {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
@@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) ->
 append_body(?Q(Bytes, Q), T) ->
     ?Q(Bytes + iolist_size(T), queue:in(T, Q)).
 
-flatten_body(Body, Tail) when is_binary(Body) -> <<Body/binary, Tail/binary>>;
-flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]).
+flatten_body(Body) when is_binary(Body) -> Body;
+flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
 
+parse_frame(Body, Header, 0, Options) ->
+    {ok, packet(Header), flatten_body(Body), ?none(Options)};
 parse_frame(Body, Header, Length, Options) ->
-    %% already appended
-    parse_frame(Body, _SplitTail = <<>>, Header, Length, Options).
-
-parse_frame(Body, Tail, Header, 0, Options) ->
-    {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)};
-parse_frame(Body, Tail, Header, Length, Options) ->
     case body_bytes(Body) >= Length of
         true ->
-            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body, Tail),
+            <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
             case parse_packet(Header, FrameBin, Options) of
                 {Variable, Payload} ->
                     {ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
@@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) ->
         false ->
             {more, {{body, #{hdr => Header,
                              len => Length,
-                             rest => append_body(Body, Tail)
+                             rest => Body
                             }}, Options}}
     end.
 

+ 9 - 10
src/emqx_http_lib.erl

@@ -108,11 +108,7 @@ normalise_headers(Headers0) ->
     [{K, proplists:get_value(K, Headers)} || K <- Keys].
 
 normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) ->
-    Scheme = atom_scheme(Scheme0),
-    DefaultPort = case https =:= Scheme of
-                      true  -> 443;
-                      false -> 80
-                  end,
+    {Scheme, DefaultPort} = atom_scheme_and_default_port(Scheme0),
     Port = case maps:get(port, Map, undefined) of
                N when is_number(N) -> N;
                _ -> DefaultPort
@@ -122,11 +118,14 @@ normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) ->
         , port => Port
         }.
 
-%% NOTE: so far we only support http schemes.
-atom_scheme(Scheme) when is_list(Scheme) -> atom_scheme(list_to_binary(Scheme));
-atom_scheme(<<"https">>) -> https;
-atom_scheme(<<"http">>) -> http;
-atom_scheme(Other) -> throw({unsupported_scheme, Other}).
+%% NOTE: so far we only support http/coap schemes.
+atom_scheme_and_default_port(Scheme) when is_list(Scheme) ->
+    atom_scheme_and_default_port(list_to_binary(Scheme));
+atom_scheme_and_default_port(<<"http">> ) -> {http,   80};
+atom_scheme_and_default_port(<<"https">>) -> {https, 443};
+atom_scheme_and_default_port(<<"coap">> ) -> {coap,  5683};
+atom_scheme_and_default_port(<<"coaps">>) -> {coaps, 5684};
+atom_scheme_and_default_port(Other) -> throw({unsupported_scheme, Other}).
 
 do_uri_encode(Char) ->
     case reserved(Char) of

+ 1 - 7
src/emqx_logger_textfmt.erl

@@ -35,15 +35,9 @@ format(#{msg := Msg0, meta := Meta} = Event, Config) ->
     logger_formatter:format(Event#{msg := Msg}, Config).
 
 maybe_merge({report, Report}, Meta) when is_map(Report) ->
-    {report, maps:merge(rename(Report), filter(Meta))};
+    {report, maps:merge(Report, filter(Meta))};
 maybe_merge(Report, _Meta) ->
     Report.
 
 filter(Meta) ->
     maps:without(?WITHOUT_MERGE, Meta).
-
-rename(#{'$kind' := Kind} = Meta0) -> % snabbkaffe
-    Meta = maps:remove('$kind', Meta0),
-    Meta#{msg => Kind};
-rename(Meta) ->
-    Meta.

+ 1 - 1
src/emqx_node_dump.erl

@@ -52,7 +52,7 @@ censor([Key | _], Val) ->
     end.
 
 is_sensitive(Key) when is_atom(Key) ->
-    is_sensitive(atom_to_binary(Key));
+    is_sensitive(atom_to_binary(Key, utf8));
 is_sensitive(Key) when is_list(Key) ->
     try iolist_to_binary(Key) of
         Bin ->

+ 12 - 5
src/emqx_plugins.erl

@@ -172,7 +172,14 @@ load_ext_plugin(PluginDir) ->
                       error({plugin_app_file_not_found, AppFile})
               end,
     ok = load_plugin_app(AppName, Ebin),
-    ok = load_plugin_conf(AppName, PluginDir).
+    try
+        ok = load_plugin_conf(AppName, PluginDir)
+    catch
+        throw : {conf_file_not_found, ConfFile} ->
+            %% this is maybe a dependency of an external plugin
+            ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
+            ok
+    end.
 
 load_plugin_app(AppName, Ebin) ->
     _ = code:add_patha(Ebin),
@@ -180,8 +187,8 @@ load_plugin_app(AppName, Ebin) ->
     lists:foreach(
         fun(BeamFile) ->
                 Module = list_to_atom(filename:basename(BeamFile, ".beam")),
-                case code:ensure_loaded(Module) of
-                    {module, Module} -> ok;
+                case code:load_file(Module) of
+                    {module, _} -> ok;
                     {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason})
                 end
         end, Modules),
@@ -193,12 +200,12 @@ load_plugin_app(AppName, Ebin) ->
 load_plugin_conf(AppName, PluginDir) ->
     Priv = filename:join([PluginDir, "priv"]),
     Etc  = filename:join([PluginDir, "etc"]),
-    Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
     ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]),
     Conf = case filelib:is_file(ConfFile) of
                true -> cuttlefish_conf:file(ConfFile);
-               false -> error({conf_file_not_found, ConfFile})
+               false -> throw({conf_file_not_found, ConfFile})
            end,
+    Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
     ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]),
     AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
     lists:foreach(fun({AppName1, Envs}) ->

+ 6 - 3
src/emqx_ws_connection.erl

@@ -257,13 +257,16 @@ websocket_init([Req, Opts]) ->
         case proplists:get_bool(proxy_protocol, Opts)
         andalso maps:get(proxy_header, Req) of
             #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
-                ProxyName = {SrcAddr, SrcPort},
+                SourceName = {SrcAddr, SrcPort},
                 %% Notice: Only CN is available in Proxy Protocol V2 additional info
-                ProxySSL = case maps:get(cn, SSL, undefined) of
+                SourceSSL = case maps:get(cn, SSL, undefined) of
                              undeined -> nossl;
                              CN -> [{pp2_ssl_cn, CN}]
                            end,
-                {ProxyName, ProxySSL};
+                {SourceName, SourceSSL};
+            #{src_address := SrcAddr, src_port := SrcPort} ->
+                SourceName = {SrcAddr, SrcPort},
+                {SourceName , nossl};
             _ ->
                 {get_peer(Req, Opts), cowboy_req:cert(Req)}
         end,

+ 14 - 10
test/emqx_cm_SUITE.erl

@@ -181,16 +181,20 @@ t_discard_session(_) ->
     ok = meck:unload(emqx_connection).
 
 t_discard_session_race(_) ->
-    ok = snabbkaffe:start_trace(),
-    #{conninfo := ConnInfo0} = ?ChanInfo,
-    ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
-    {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
-    ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
-    Pid ! stop,
-    receive {'DOWN', Ref, process, Pid, normal} -> ok end,
-    ok = emqx_cm:discard_session(<<"clientid">>),
-    {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000),
-    snabbkaffe:stop().
+    ?check_trace(
+       begin
+         #{conninfo := ConnInfo0} = ?ChanInfo,
+         ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
+         {Pid, Ref} = spawn_monitor(fun() -> receive stop -> exit(normal) end end),
+         ok = emqx_cm:register_channel(<<"clientid">>, Pid, ConnInfo),
+         Pid ! stop,
+         receive {'DOWN', Ref, process, Pid, normal} -> ok end,
+         ok = emqx_cm:discard_session(<<"clientid">>),
+         {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
+       end,
+       fun(_, _) ->
+               true
+       end).
 
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,

+ 20 - 1
test/emqx_frame_SUITE.erl

@@ -58,7 +58,8 @@ groups() ->
        t_serialize_parse_connack_v5
       ]},
      {publish, [parallel],
-      [t_serialize_parse_qos0_publish,
+      [t_parse_sticky_frames,
+       t_serialize_parse_qos0_publish,
        t_serialize_parse_qos1_publish,
        t_serialize_parse_qos2_publish,
        t_serialize_parse_publish_v5
@@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) ->
     Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
     ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
 
+t_parse_sticky_frames(_) ->
+    Payload = lists:duplicate(10, 0),
+    P = #mqtt_packet{header = #mqtt_packet_header{type   = ?PUBLISH,
+                                                  dup    = false,
+                                                  qos    = ?QOS_0,
+                                                  retain = false},
+                     variable = #mqtt_packet_publish{topic_name = <<"a/b">>,
+                                                     packet_id  = undefined},
+                     payload  = iolist_to_binary(Payload)
+                    },
+    Bin = serialize_to_binary(P),
+    Size = size(Bin),
+    <<H:(Size-2)/binary, TailTwoBytes/binary>> = Bin,
+    {more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes
+    %% feed 3 bytes as if the next 1 byte belongs to the next packet.
+    {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1),
+    ?assertMatch({none, _}, PState2).
+
 t_serialize_parse_qos0_publish(_) ->
     Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
     Packet = #mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,

+ 10 - 0
test/emqx_http_lib_tests.erl

@@ -66,6 +66,16 @@ uri_parse_test_() ->
                              emqx_http_lib:uri_parse("HTTPS://127.0.0.1"))
        end
       }
+    , {"coap default port",
+       fun() -> ?assertMatch({ok, #{scheme := coap, port := 5683}},
+                             emqx_http_lib:uri_parse("coap://127.0.0.1"))
+       end
+      }
+    , {"coaps default port",
+       fun() -> ?assertMatch({ok, #{scheme := coaps, port := 5684}},
+                             emqx_http_lib:uri_parse("coaps://127.0.0.1"))
+       end
+      }
     , {"unsupported_scheme",
        fun() -> ?assertEqual({error, {unsupported_scheme, <<"wss">>}},
                              emqx_http_lib:uri_parse("wss://127.0.0.1"))