|
|
@@ -51,6 +51,8 @@
|
|
|
inflight :: emqx_inflight:inflight(),
|
|
|
%% Message Queue
|
|
|
mqueue :: queue:queue(),
|
|
|
+ %% Subscriptions
|
|
|
+ subscriptions :: map(),
|
|
|
retx_interval,
|
|
|
retx_max_times,
|
|
|
max_mqueue_len
|
|
|
@@ -115,7 +117,7 @@ stats(#channel{inflight = Inflight, mqueue = Queue}) ->
|
|
|
%% XXX: A fake stats for managed by emqx_management
|
|
|
[
|
|
|
{subscriptions_cnt, 1},
|
|
|
- {subscriptions_max, 0},
|
|
|
+ {subscriptions_max, 1},
|
|
|
{inflight_cnt, emqx_inflight:size(Inflight)},
|
|
|
{inflight_max, emqx_inflight:max_size(Inflight)},
|
|
|
{mqueue_len, queue:len(Queue)},
|
|
|
@@ -181,6 +183,7 @@ init(
|
|
|
clientinfo = ClientInfo,
|
|
|
inflight = emqx_inflight:new(1),
|
|
|
mqueue = queue:new(),
|
|
|
+ subscriptions = #{},
|
|
|
timers = #{},
|
|
|
conn_state = idle,
|
|
|
retx_interval = RetxInterv,
|
|
|
@@ -370,9 +373,15 @@ handle_call(kick, _From, Channel) ->
|
|
|
disconnect_and_shutdown(kicked, ok, Channel1);
|
|
|
handle_call(discard, _From, Channel) ->
|
|
|
disconnect_and_shutdown(discarded, ok, Channel);
|
|
|
+handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
|
|
|
+ reply({error, not_support}, Channel);
|
|
|
+handle_call({unsubscribe, _Topic}, _From, Channel) ->
|
|
|
+ reply({error, not_found}, Channel);
|
|
|
+handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subscriptions}) ->
|
|
|
+ reply({ok, maps:to_list(Subscriptions)}, Channel);
|
|
|
handle_call(Req, _From, Channel) ->
|
|
|
log(error, #{msg => "unexpected_call", call => Req}, Channel),
|
|
|
- reply(ignored, Channel).
|
|
|
+ reply({error, unexpected_call}, Channel).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Handle cast
|
|
|
@@ -606,8 +615,8 @@ process_connect(
|
|
|
)
|
|
|
of
|
|
|
{ok, #{session := Session}} ->
|
|
|
- NChannel = Channel#channel{session = Session},
|
|
|
- subscribe_downlink(?DEFAULT_DOWNLINK_TOPIC, Channel),
|
|
|
+ NChannel0 = Channel#channel{session = Session},
|
|
|
+ NChannel = subscribe_downlink(?DEFAULT_DOWNLINK_TOPIC, NChannel0),
|
|
|
_ = upstreaming(Frame, NChannel),
|
|
|
%% XXX: connection_accepted is not defined by stomp protocol
|
|
|
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
|
|
|
@@ -854,11 +863,13 @@ subscribe_downlink(
|
|
|
#{
|
|
|
clientid := ClientId,
|
|
|
mountpoint := Mountpoint
|
|
|
- }
|
|
|
- }
|
|
|
+ },
|
|
|
+ subscriptions = Subscriptions
|
|
|
+ } = Channel
|
|
|
) ->
|
|
|
{ParsedTopic, SubOpts0} = emqx_topic:parse(Topic),
|
|
|
SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts0),
|
|
|
MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
|
|
|
_ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts),
|
|
|
- run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]).
|
|
|
+ run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]),
|
|
|
+ Channel#channel{subscriptions = Subscriptions#{MountedTopic => SubOpts}}.
|