|
|
@@ -70,14 +70,14 @@ lookup_proc(ClientId) when is_binary(ClientId) ->
|
|
|
%% @doc Register ClientId with Pid.
|
|
|
-spec(register(Client :: mqtt_client()) -> ok).
|
|
|
register(Client = #mqtt_client{client_id = ClientId}) ->
|
|
|
- CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
|
|
- gen_server2:cast(CmPid, {register, Client}).
|
|
|
+ gen_server2:call(pick(ClientId), {register, Client}, 120000).
|
|
|
|
|
|
%% @doc Unregister clientId with pid.
|
|
|
-spec(unregister(ClientId :: binary()) -> ok).
|
|
|
unregister(ClientId) when is_binary(ClientId) ->
|
|
|
- CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
|
|
- gen_server2:cast(CmPid, {unregister, ClientId, self()}).
|
|
|
+ gen_server2:cast(pick(ClientId), {unregister, ClientId, self()}).
|
|
|
+
|
|
|
+pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
@@ -85,16 +85,16 @@ unregister(ClientId) when is_binary(ClientId) ->
|
|
|
|
|
|
init([Pool, Id, StatsFun]) ->
|
|
|
?GPROC_POOL(join, Pool, Id),
|
|
|
- {ok, #state{pool = Pool, id = Id,
|
|
|
- statsfun = StatsFun,
|
|
|
- monitors = dict:new()}}.
|
|
|
+ {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}.
|
|
|
|
|
|
-prioritise_call(_Req, _From, _Len, _State) ->
|
|
|
- 1.
|
|
|
+prioritise_call(Req, _From, _Len, _State) ->
|
|
|
+ case Req of
|
|
|
+ {register, _Client} -> 2;
|
|
|
+ _ -> 1
|
|
|
+ end.
|
|
|
|
|
|
prioritise_cast(Msg, _Len, _State) ->
|
|
|
case Msg of
|
|
|
- {register, _Client} -> 2;
|
|
|
{unregister, _ClientId, _Pid} -> 9;
|
|
|
_ -> 1
|
|
|
end.
|
|
|
@@ -102,19 +102,19 @@ prioritise_cast(Msg, _Len, _State) ->
|
|
|
prioritise_info(_Msg, _Len, _State) ->
|
|
|
3.
|
|
|
|
|
|
-handle_call(Req, _From, State) ->
|
|
|
- ?UNEXPECTED_REQ(Req, State).
|
|
|
-
|
|
|
-handle_cast({register, Client = #mqtt_client{client_id = ClientId,
|
|
|
- client_pid = Pid}}, State) ->
|
|
|
+handle_call({register, Client = #mqtt_client{client_id = ClientId,
|
|
|
+ client_pid = Pid}}, _From, State) ->
|
|
|
case lookup_proc(ClientId) of
|
|
|
Pid ->
|
|
|
- {noreply, State};
|
|
|
+ {reply, ok, State};
|
|
|
_ ->
|
|
|
ets:insert(mqtt_client, Client),
|
|
|
- {noreply, setstats(monitor_client(ClientId, Pid, State))}
|
|
|
+ {reply, ok, setstats(monitor_client(ClientId, Pid, State))}
|
|
|
end;
|
|
|
|
|
|
+handle_call(Req, _From, State) ->
|
|
|
+ ?UNEXPECTED_REQ(Req, State).
|
|
|
+
|
|
|
handle_cast({unregister, ClientId, Pid}, State) ->
|
|
|
case lookup_proc(ClientId) of
|
|
|
Pid ->
|