Ver código fonte

Fix will message mechanism for websocket channel

GilbertWong 6 anos atrás
pai
commit
234037aee1

+ 4 - 4
src/emqx_session.erl

@@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) ->
 
 -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
 unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
-   TopicFilters = lists:map(fun({RawTopic, Opts}) ->
-                                    emqx_topic:parse(RawTopic, Opts);
-                               (RawTopic) when is_binary(RawTopic) ->
-                                    emqx_topic:parse(RawTopic)
+    TopicFilters = lists:map(fun({RawTopic, Opts}) ->
+                                     emqx_topic:parse(RawTopic, Opts);
+                                (RawTopic) when is_binary(RawTopic) ->
+                                     emqx_topic:parse(RawTopic)
                             end, RawTopicFilters),
     unsubscribe(SPid, undefined, #{}, TopicFilters).
 

+ 6 - 5
src/emqx_ws_channel.erl

@@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive   = Keepalive,
     case {ProtoState, Shutdown} of
         {undefined, _} -> ok;
         {_, {shutdown, Reason}} ->
-            emqx_protocol:terminate(Reason, ProtoState);
+            emqx_protocol:terminate(Reason, ProtoState),
+            exit(Reason);
         {_, Error} ->
-            emqx_protocol:terminate(Error, ProtoState)
+            ?LOG(error, "Un expected terminated for ~p", [Error]),
+            emqx_protocol:terminate(Error, ProtoState),
+            exit(unknown)
     end.
 
 %%--------------------------------------------------------------------
@@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
             shutdown(Error, State#state{proto_state = NProtoState})
     end.
 
-
-
 ensure_stats_timer(State = #state{enable_stats = true,
                                   stats_timer  = undefined,
                                   idle_timeout = IdleTimeout}) ->
@@ -345,7 +346,7 @@ ensure_stats_timer(State) ->
 
 shutdown(Reason, State) ->
     %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
-    self() ! {stop, Reason},
+    self() ! {stop, State#state{shutdown = {shutdown, Reason}}},
     {ok, State}.
 
 wsock_stats() ->

+ 29 - 0
test/emqx_ws_channel_SUITE.erl

@@ -28,10 +28,23 @@
                                 username  = <<"admin">>,
                                 password  = <<"public">>})).
 
+-define(WILL_TOPIC, <<"test/websocket/will">>).
+
+-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
+                                     client_id = <<"mqtt_client">>,
+                                     username  = <<"admin">>,
+                                     password  = <<"public">>,
+                                     will_flag = true,
+                                     will_qos = ?QOS_1,
+                                     will_topic = ?WILL_TOPIC,
+                                     will_payload = <<"payload">>
+                                   })).
+
 all() ->
     [ t_ws_connect_api
     , t_ws_auth_failure
     , t_ws_other_type_frame
+    , t_ws_will
     ].
 
 init_per_suite(Config) ->
@@ -41,6 +54,22 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+t_ws_will(_Config) ->
+    {ok, ClientPid} = emqx_client:start_link(),
+    {ok, _} = emqx_client:connect(ClientPid),
+    {ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1),
+    WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
+    {ok, _} = rfc6455_client:open(WS),
+    Packet = raw_send_serialize(?WILL_CLIENT),
+    ok = rfc6455_client:send_binary(WS, Packet),
+    {binary, Bin} = rfc6455_client:recv(WS),
+    Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
+    {ok, Connack, <<>>, _} = raw_recv_pase(Bin),
+    exit(WS, abnomal),
+    ?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))),
+    ok = emqx_client:disconnect(ClientPid),
+    ok.
+
 t_ws_auth_failure(_Config) ->
     application:set_env(emqx, allow_anonymous, false),
     WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),

+ 0 - 1
test/rfc6455_client.erl

@@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) ->
     gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
     State#state{phase = closing}.
 
-
 loop(State = #state{socket = Socket, ppid = PPid, data = Data,
                     phase = Phase}) ->
     receive