|
|
@@ -336,9 +336,13 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
|
|
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
|
|
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
|
|
|
|
|
-handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
|
|
- #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
|
|
- {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
|
|
+%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
|
|
+%% it `unsubscribed` the last topic.
|
|
|
+%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
|
|
|
+%% be disconnected.
|
|
|
+% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
|
|
+% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
|
|
+% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
|
|
|
|
|
handle_info({mnesia_table_event, _Event}, State) ->
|
|
|
{noreply, State};
|