|
|
@@ -42,9 +42,9 @@
|
|
|
%% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
|
|
|
-record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
|
|
|
clean_sess, proto_ver, proto_name, username, is_superuser,
|
|
|
- will_msg, keepalive, max_clientid_len, session, stats_data,
|
|
|
- keepalive_backoff, peercert_username, ws_initial_headers,
|
|
|
- mountpoint, connected_at}).
|
|
|
+ will_msg, keepalive, keepalive_backoff, max_clientid_len,
|
|
|
+ session, stats_data, mountpoint, ws_initial_headers,
|
|
|
+ is_bridge, connected_at}).
|
|
|
|
|
|
-type(proto_state() :: #proto_state{}).
|
|
|
|
|
|
@@ -201,17 +201,18 @@ process(?CONNECT_PACKET(Var), State0) ->
|
|
|
password = Password,
|
|
|
clean_sess = CleanSess,
|
|
|
keep_alive = KeepAlive,
|
|
|
- client_id = ClientId} = Var,
|
|
|
-
|
|
|
- State1 = repl_username_with_peercert(
|
|
|
- State0#proto_state{proto_ver = ProtoVer,
|
|
|
- proto_name = ProtoName,
|
|
|
- username = Username,
|
|
|
- client_id = ClientId,
|
|
|
- clean_sess = CleanSess,
|
|
|
- keepalive = KeepAlive,
|
|
|
- will_msg = willmsg(Var, State0),
|
|
|
- connected_at = os:timestamp()}),
|
|
|
+ client_id = ClientId,
|
|
|
+ is_bridge = IsBridge} = Var,
|
|
|
+
|
|
|
+ State1 = State0#proto_state{proto_ver = ProtoVer,
|
|
|
+ proto_name = ProtoName,
|
|
|
+ username = Username,
|
|
|
+ client_id = ClientId,
|
|
|
+ clean_sess = CleanSess,
|
|
|
+ keepalive = KeepAlive,
|
|
|
+ will_msg = willmsg(Var, State0),
|
|
|
+ is_bridge = IsBridge,
|
|
|
+ connected_at = os:timestamp()},
|
|
|
|
|
|
{ReturnCode1, SessPresent, State3} =
|
|
|
case validate_connect(Var, State1) of
|
|
|
@@ -233,7 +234,7 @@ process(?CONNECT_PACKET(Var), State0) ->
|
|
|
%% ACCEPT
|
|
|
{?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
|
|
|
{error, Error} ->
|
|
|
- exit({shutdown, Error})
|
|
|
+ {stop, {shutdown, Error}, State2}
|
|
|
end;
|
|
|
{error, Reason}->
|
|
|
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
|
|
|
@@ -355,10 +356,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
|
|
-spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
|
|
|
send(Msg, State = #proto_state{client_id = ClientId,
|
|
|
username = Username,
|
|
|
- mountpoint = MountPoint})
|
|
|
+ mountpoint = MountPoint,
|
|
|
+ is_bridge = IsBridge})
|
|
|
when is_record(Msg, mqtt_message) ->
|
|
|
emqx_hooks:run('message.delivered', [ClientId, Username], Msg),
|
|
|
- send(emqx_message:to_packet(unmount(MountPoint, Msg)), State);
|
|
|
+ send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
|
|
|
|
|
|
send(Packet = ?PACKET(Type),
|
|
|
State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
|
|
|
@@ -400,12 +402,13 @@ stop_if_auth_failure(_RC, State) ->
|
|
|
|
|
|
shutdown(_Error, #proto_state{client_id = undefined}) ->
|
|
|
ignore;
|
|
|
-
|
|
|
-shutdown(conflict, #proto_state{client_id = _ClientId}) ->
|
|
|
+shutdown(conflict, _State) ->
|
|
|
+ %% let it down
|
|
|
+ ignore;
|
|
|
+shutdown(mnesia_conflict, _State) ->
|
|
|
%% let it down
|
|
|
%% emqx_cm:unreg(ClientId);
|
|
|
ignore;
|
|
|
-
|
|
|
shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
|
|
|
?LOG(info, "Shutdown for ~p", [Error], State),
|
|
|
Client = client(State),
|
|
|
@@ -565,6 +568,15 @@ check_acl(subscribe, Topic, Client) ->
|
|
|
sp(true) -> 1;
|
|
|
sp(false) -> 0.
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% The retained flag should be propagated for bridge.
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+clean_retain(false, Msg = #mqtt_message{retain = true}) ->
|
|
|
+ Msg#mqtt_message{retain = false};
|
|
|
+clean_retain(_IsBridge, Msg) ->
|
|
|
+ Msg.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Mount Point
|
|
|
%%--------------------------------------------------------------------
|