|
|
@@ -118,8 +118,8 @@ info(ctx, #channel{ctx = Ctx}) ->
|
|
|
Ctx.
|
|
|
|
|
|
-spec stats(channel()) -> emqx_types:stats().
|
|
|
-stats(_) ->
|
|
|
- [].
|
|
|
+stats(#channel{session = Session}) ->
|
|
|
+ emqx_coap_session:stats(Session).
|
|
|
|
|
|
-spec init(map(), map()) -> channel().
|
|
|
init(
|
|
|
@@ -273,7 +273,7 @@ handle_call(
|
|
|
SubReq, TempMsg, #{}, Session
|
|
|
),
|
|
|
NSession = maps:get(session, Result),
|
|
|
- {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}};
|
|
|
+ {reply, {ok, {MountedTopic, NSubOpts}}, [{event, updated}], Channel#channel{session = NSession}};
|
|
|
handle_call(
|
|
|
{unsubscribe, Topic},
|
|
|
_From,
|
|
|
@@ -300,7 +300,7 @@ handle_call(
|
|
|
UnSubReq, TempMsg, #{}, Session
|
|
|
),
|
|
|
NSession = maps:get(session, Result),
|
|
|
- {reply, ok, Channel#channel{session = NSession}};
|
|
|
+ {reply, ok, [{event, updated}], Channel#channel{session = NSession}};
|
|
|
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
|
|
Subs = emqx_coap_session:info(subscriptions, Session),
|
|
|
{reply, {ok, maps:to_list(Subs)}, Channel};
|
|
|
@@ -486,7 +486,6 @@ enrich_conninfo(
|
|
|
conninfo = ConnInfo
|
|
|
}
|
|
|
) ->
|
|
|
- %% FIXME: generate a random clientid if absent
|
|
|
case Queries of
|
|
|
#{<<"clientid">> := ClientId} ->
|
|
|
Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
|
|
|
@@ -500,32 +499,20 @@ enrich_conninfo(
|
|
|
},
|
|
|
{ok, Channel#channel{conninfo = NConnInfo}};
|
|
|
_ ->
|
|
|
- {error, "invalid queries", Channel}
|
|
|
+ {error, "clientid is required", Channel}
|
|
|
end.
|
|
|
|
|
|
enrich_clientinfo(
|
|
|
{Queries, Msg},
|
|
|
- Channel = #channel{clientinfo = ClientInfo0}
|
|
|
+ Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo0}
|
|
|
) ->
|
|
|
- %% FIXME:
|
|
|
- %% 1. generate a random clientid if absent;
|
|
|
- %% 2. assgin username, password to `undefined` if absent
|
|
|
- case Queries of
|
|
|
- #{
|
|
|
- <<"username">> := UserName,
|
|
|
- <<"password">> := Password,
|
|
|
- <<"clientid">> := ClientId
|
|
|
- } ->
|
|
|
- ClientInfo = ClientInfo0#{
|
|
|
- username => UserName,
|
|
|
- password => Password,
|
|
|
- clientid => ClientId
|
|
|
- },
|
|
|
- {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
|
|
|
- {ok, Channel#channel{clientinfo = NClientInfo}};
|
|
|
- _ ->
|
|
|
- {error, "invalid queries", Channel}
|
|
|
- end.
|
|
|
+ ClientInfo = ClientInfo0#{
|
|
|
+ clientid => maps:get(clientid, ConnInfo),
|
|
|
+ username => maps:get(<<"username">>, Queries, undefined),
|
|
|
+ password => maps:get(<<"password">>, Queries, undefined)
|
|
|
+ },
|
|
|
+ {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
|
|
|
+ {ok, Channel#channel{clientinfo = NClientInfo}}.
|
|
|
|
|
|
set_log_meta(_Input, #channel{clientinfo = #{clientid := ClientId}}) ->
|
|
|
emqx_logger:set_metadata_clientid(ClientId),
|