|
|
@@ -17,6 +17,8 @@
|
|
|
%% @doc The gateway connection registry
|
|
|
-module(emqx_gateway_cm_registry).
|
|
|
|
|
|
+-include("include/emqx_gateway.hrl").
|
|
|
+
|
|
|
-behaviour(gen_server).
|
|
|
|
|
|
-export([start_link/1]).
|
|
|
@@ -27,6 +29,8 @@
|
|
|
|
|
|
-export([lookup_channels/2]).
|
|
|
|
|
|
+-export([tabname/1]).
|
|
|
+
|
|
|
%% gen_server callbacks
|
|
|
-export([ init/1
|
|
|
, handle_call/3
|
|
|
@@ -41,39 +45,40 @@
|
|
|
|
|
|
-record(channel, {chid, pid}).
|
|
|
|
|
|
-%% @doc Start the global channel registry.
|
|
|
--spec(start_link(atom()) -> gen_server:startlink_ret()).
|
|
|
-start_link(Type) ->
|
|
|
- gen_server:start_link(?MODULE, [Type], []).
|
|
|
+%% @doc Start the global channel registry for the gived gateway name.
|
|
|
+-spec(start_link(gateway_name()) -> gen_server:startlink_ret()).
|
|
|
+start_link(Name) ->
|
|
|
+ gen_server:start_link(?MODULE, [Name], []).
|
|
|
|
|
|
--spec tabname(atom()) -> atom().
|
|
|
-tabname(Type) ->
|
|
|
- list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_registry'])).
|
|
|
+-spec tabname(gateway_name()) -> atom().
|
|
|
+tabname(Name) ->
|
|
|
+ %% XXX: unsafe ??
|
|
|
+ list_to_atom(lists:concat([emqx_gateway_, Name, '_channel_registry'])).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% APIs
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @doc Register a global channel.
|
|
|
--spec register_channel(atom(), binary() | {binary(), pid()}) -> ok.
|
|
|
-register_channel(Type, ClientId) when is_binary(ClientId) ->
|
|
|
- register_channel(Type, {ClientId, self()});
|
|
|
+-spec register_channel(gateway_name(), binary() | {binary(), pid()}) -> ok.
|
|
|
+register_channel(Name, ClientId) when is_binary(ClientId) ->
|
|
|
+ register_channel(Name, {ClientId, self()});
|
|
|
|
|
|
-register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
|
|
|
- mria:dirty_write(tabname(Type), record(ClientId, ChanPid)).
|
|
|
+register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
|
|
|
+ mria:dirty_write(tabname(Name), record(ClientId, ChanPid)).
|
|
|
|
|
|
%% @doc Unregister a global channel.
|
|
|
--spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok.
|
|
|
-unregister_channel(Type, ClientId) when is_binary(ClientId) ->
|
|
|
- unregister_channel(Type, {ClientId, self()});
|
|
|
+-spec unregister_channel(gateway_name(), binary() | {binary(), pid()}) -> ok.
|
|
|
+unregister_channel(Name, ClientId) when is_binary(ClientId) ->
|
|
|
+ unregister_channel(Name, {ClientId, self()});
|
|
|
|
|
|
-unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
|
|
|
- mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)).
|
|
|
+unregister_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
|
|
|
+ mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)).
|
|
|
|
|
|
%% @doc Lookup the global channels.
|
|
|
--spec lookup_channels(atom(), binary()) -> list(pid()).
|
|
|
-lookup_channels(Type, ClientId) ->
|
|
|
- [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Type), ClientId)].
|
|
|
+-spec lookup_channels(gateway_name(), binary()) -> list(pid()).
|
|
|
+lookup_channels(Name, ClientId) ->
|
|
|
+ [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)].
|
|
|
|
|
|
record(ClientId, ChanPid) ->
|
|
|
#channel{chid = ClientId, pid = ChanPid}.
|
|
|
@@ -82,8 +87,8 @@ record(ClientId, ChanPid) ->
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-init([Type]) ->
|
|
|
- Tab = tabname(Type),
|
|
|
+init([Name]) ->
|
|
|
+ Tab = tabname(Name),
|
|
|
ok = mria:create_table(Tab, [
|
|
|
{type, bag},
|
|
|
{rlog_shard, ?CM_SHARD},
|
|
|
@@ -94,7 +99,7 @@ init([Type]) ->
|
|
|
{write_concurrency, true}]}]}]),
|
|
|
ok = mria:wait_for_tables([Tab]),
|
|
|
ok = ekka:monitor(membership),
|
|
|
- {ok, #{type => Type}}.
|
|
|
+ {ok, #{name => Name}}.
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
logger:error("Unexpected call: ~p", [Req]),
|
|
|
@@ -104,8 +109,8 @@ handle_cast(Msg, State) ->
|
|
|
logger:error("Unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) ->
|
|
|
- Tab = tabname(Type),
|
|
|
+handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) ->
|
|
|
+ Tab = tabname(Name),
|
|
|
global:trans({?LOCK, self()},
|
|
|
fun() ->
|
|
|
mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
|