|
|
@@ -22,6 +22,8 @@
|
|
|
%%% @doc
|
|
|
%%% emqttd core pubsub.
|
|
|
%%%
|
|
|
+%%% TODO: should not use gen_server:call to create, subscribe topics...
|
|
|
+%%%
|
|
|
%%% @end
|
|
|
%%%-----------------------------------------------------------------------------
|
|
|
-module(emqttd_pubsub).
|
|
|
@@ -46,8 +48,8 @@
|
|
|
|
|
|
-export([topics/0,
|
|
|
create/1,
|
|
|
- subscribe/2,
|
|
|
- unsubscribe/2,
|
|
|
+ subscribe/1,
|
|
|
+ unsubscribe/1,
|
|
|
publish/1,
|
|
|
publish/2,
|
|
|
%local node
|
|
|
@@ -106,8 +108,8 @@ topics() ->
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}.
|
|
|
-create(Topic) ->
|
|
|
- gen_server:call(?SERVER, {create, Topic}).
|
|
|
+create(Topic) when is_binary(Topic) ->
|
|
|
+ {atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
|
@@ -115,12 +117,25 @@ create(Topic) ->
|
|
|
%%
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}.
|
|
|
-subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
|
|
- subscribe([{Topic, Qos}], SubPid);
|
|
|
-
|
|
|
-subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
|
|
- gen_server:call(?SERVER, {subscribe, Topics, SubPid}).
|
|
|
+-spec subscribe({binary(), mqtt_qos()} | list()) -> {ok, list(mqtt_qos())}.
|
|
|
+subscribe({Topic, Qos}) when is_binary(Topic) ->
|
|
|
+ case subscribe([{Topic, Qos}]) of
|
|
|
+ {ok, [GrantedQos]} -> {ok, GrantedQos};
|
|
|
+ {error, Error} -> {error, Error}
|
|
|
+ end;
|
|
|
+subscribe(Topics = [{_Topic, _Qos}|_]) ->
|
|
|
+ subscribe(Topics, self(), []).
|
|
|
+
|
|
|
+subscribe([], _SubPid, Acc) ->
|
|
|
+ {ok, lists:reverse(Acc)};
|
|
|
+%%TODO: check this function later.
|
|
|
+subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
|
|
+ Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid},
|
|
|
+ F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end,
|
|
|
+ case mnesia:transaction(F) of
|
|
|
+ {atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]);
|
|
|
+ Error -> {error, Error}
|
|
|
+ end.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
|
@@ -128,12 +143,27 @@ subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
|
|
%%
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec unsubscribe(binary() | list(binary()), pid()) -> ok.
|
|
|
-unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
|
|
- unsubscribe([Topic], SubPid);
|
|
|
-
|
|
|
-unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
|
|
- gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}).
|
|
|
+-spec unsubscribe(binary() | list(binary())) -> ok.
|
|
|
+unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
+ unsubscribe([Topic]);
|
|
|
+
|
|
|
+unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) ->
|
|
|
+ unsubscribe(Topics, self()).
|
|
|
+
|
|
|
+%%TODO: check this function later.
|
|
|
+unsubscribe(Topics, SubPid) ->
|
|
|
+ F = fun() ->
|
|
|
+ Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid),
|
|
|
+ lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) ->
|
|
|
+ case lists:member(Topic, Topics) of
|
|
|
+ true -> mneisa:delete_object(Sub);
|
|
|
+ false -> ok
|
|
|
+ end
|
|
|
+ end, Subscribers)
|
|
|
+ %TODO: try to remove topic??? if topic is dynamic...
|
|
|
+ %%try_remove_topic(Topic)
|
|
|
+ end,
|
|
|
+ {atomic, _} = mneisa:transaction(F), ok.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
|
@@ -164,7 +194,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
|
|
|
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
|
|
- Subscribers = ets:lookup(topic_subscriber, Topic),
|
|
|
+ Subscribers = mnesia:dirty_read(topic_subscriber, Topic),
|
|
|
lists:foreach(
|
|
|
fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
|
|
|
Msg1 = if
|
|
|
@@ -193,72 +223,79 @@ match(Topic) when is_binary(Topic) ->
|
|
|
%% ------------------------------------------------------------------
|
|
|
|
|
|
init([]) ->
|
|
|
- mnesia:create_table(topic_trie, [
|
|
|
- {ram_copies, [node()]},
|
|
|
- {attributes, record_info(fields, topic_trie)}]),
|
|
|
+ %% trie and topic tables, will be copied by all nodes.
|
|
|
mnesia:create_table(topic_trie_node, [
|
|
|
{ram_copies, [node()]},
|
|
|
{attributes, record_info(fields, topic_trie_node)}]),
|
|
|
+ mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
|
|
|
+ mnesia:create_table(topic_trie, [
|
|
|
+ {ram_copies, [node()]},
|
|
|
+ {attributes, record_info(fields, topic_trie)}]),
|
|
|
+ mnesia:add_table_copy(topic_trie, node(), ram_copies),
|
|
|
mnesia:create_table(topic, [
|
|
|
{type, bag},
|
|
|
{record_name, topic},
|
|
|
{ram_copies, [node()]},
|
|
|
{attributes, record_info(fields, topic)}]),
|
|
|
- mnesia:add_table_copy(topic_trie, node(), ram_copies),
|
|
|
- mnesia:add_table_copy(topic_trie_node, node(), ram_copies),
|
|
|
mnesia:add_table_copy(topic, node(), ram_copies),
|
|
|
- ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
|
|
|
+ mnesia:subscribe({table, topic, simple}),
|
|
|
+ %% local table, not shared with other table
|
|
|
+ mnesia:create_table(topic_subscriber, [
|
|
|
+ {type, bag},
|
|
|
+ {record_name, topic_subscriber},
|
|
|
+ {ram_copies, [node()]},
|
|
|
+ {attributes, record_info(fields, topic_subscriber)},
|
|
|
+ {index, [subpid]},
|
|
|
+ {local_content, true}]),
|
|
|
+ mnesia:subscribe({table, topic_subscriber, simple}),
|
|
|
{ok, #state{}}.
|
|
|
|
|
|
handle_call(getstats, _From, State = #state{max_subs = Max}) ->
|
|
|
Stats = [{'topics/count', mnesia:table_info(topic, size)},
|
|
|
- {'subscribers/count', ets:info(topic_subscriber, size)},
|
|
|
+ {'subscribers/count', mnesia:info(topic_subscriber, size)},
|
|
|
{'subscribers/max', Max}],
|
|
|
{reply, Stats, State};
|
|
|
|
|
|
-handle_call({create, Topic}, _From, State) ->
|
|
|
- Result = mnesia:transaction(fun trie_add/1, [Topic]),
|
|
|
- {reply, Result, setstats(State)};
|
|
|
-
|
|
|
-handle_call({subscribe, Topics, SubPid}, _From, State) ->
|
|
|
- Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
|
|
|
- Reply =
|
|
|
- case [Err || Err = {error, _} <- Result] of
|
|
|
- [] -> {ok, [Qos || {ok, Qos} <- Result]};
|
|
|
- Errors -> hd(Errors)
|
|
|
- end,
|
|
|
- {reply, Reply, setstats(State)};
|
|
|
-
|
|
|
handle_call(Req, _From, State) ->
|
|
|
- {stop, {badreq, Req}, State}.
|
|
|
-
|
|
|
-handle_cast({unsubscribe, Topics, SubPid}, State) ->
|
|
|
- lists:foreach(fun(Topic) ->
|
|
|
- ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
|
|
|
- try_remove_topic(Topic)
|
|
|
- end, Topics),
|
|
|
- {noreply, setstats(State)};
|
|
|
+ lager:error("Bad Req: ~p", [Req]),
|
|
|
+ {reply, error, State}.
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
- {stop, {badmsg, Msg}, State}.
|
|
|
-
|
|
|
-handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
|
|
- case get({submon, Mon}) of
|
|
|
- undefined ->
|
|
|
- lager:error("unexpected 'DOWN': ~p", [Mon]);
|
|
|
- SubPid ->
|
|
|
- erase({submon, Mon}),
|
|
|
- erase({subscriber, SubPid}),
|
|
|
- Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}),
|
|
|
- [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs],
|
|
|
- [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
|
|
- end,
|
|
|
+ lager:error("Bad Msg: ~p", [Msg]),
|
|
|
+ {noreply, State}.
|
|
|
+
|
|
|
+%% a new record has been written.
|
|
|
+handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) ->
|
|
|
+ erlang:monitor(process, Pid),
|
|
|
+ {noreply, setstats(State)};
|
|
|
+
|
|
|
+%% {write, #topic{}, _ActivityId}
|
|
|
+%% {delete_object, _OldRecord, _ActivityId}
|
|
|
+%% {delete, {Tab, Key}, ActivityId}
|
|
|
+handle_info({mnesia_table_event, _Event}, State) ->
|
|
|
+ {noreply, setstats(State)};
|
|
|
+
|
|
|
+handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
|
|
+ F = fun() ->
|
|
|
+ %%TODO: how to read with write lock?
|
|
|
+ [mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)]
|
|
|
+ %%TODO: try to remove dynamic topics without subscribers
|
|
|
+ %% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
|
|
+ end,
|
|
|
+ case catch mnesia:transaction(F) of
|
|
|
+ {atomic, _} -> ok;
|
|
|
+ {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
|
|
|
+ end,
|
|
|
{noreply, setstats(State)};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
- {stop, {badinfo, Info}, State}.
|
|
|
+ lager:error("Bad Info: ~p", [Info]),
|
|
|
+ {noreply, State}.
|
|
|
|
|
|
terminate(_Reason, _State) ->
|
|
|
+ mnesia:unsubscribe({table, topic, simple}),
|
|
|
+ mnesia:unsubscribe({table, topic_subscriber, simple}),
|
|
|
+ %%TODO: clear topics belongs to this node???
|
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
@@ -267,57 +304,22 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%%%=============================================================================
|
|
|
%%% Internal functions
|
|
|
%%%=============================================================================
|
|
|
-subscribe_topic({Topic, Qos}, SubPid) ->
|
|
|
- case mnesia:transaction(fun trie_add/1, [Topic]) of
|
|
|
- {atomic, _} ->
|
|
|
- case get({subscriber, SubPid}) of
|
|
|
- undefined ->
|
|
|
- %%TODO: refactor later...
|
|
|
- MonRef = erlang:monitor(process, SubPid),
|
|
|
- put({subcriber, SubPid}, MonRef),
|
|
|
- put({submon, MonRef}, SubPid);
|
|
|
- _ ->
|
|
|
- already_monitored
|
|
|
- end,
|
|
|
- %% remove duplicated subscribers
|
|
|
- try_remove_subscriber({Topic, Qos}, SubPid),
|
|
|
- ets:insert(topic_subscriber, #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid}),
|
|
|
- %TODO: GrantedQos??
|
|
|
- {ok, Qos};
|
|
|
- {aborted, Reason} ->
|
|
|
- {error, Reason}
|
|
|
- end.
|
|
|
|
|
|
-try_remove_subscriber({Topic, Qos}, SubPid) ->
|
|
|
- case ets:lookup(topic_subscriber, Topic) of
|
|
|
- [] ->
|
|
|
- not_found;
|
|
|
- Subs ->
|
|
|
- DupSubs = [Sub || Sub = #topic_subscriber{qos = OldQos, subpid = OldPid}
|
|
|
- <- Subs, Qos =/= OldQos, OldPid =:= SubPid],
|
|
|
- case DupSubs of
|
|
|
- [] -> ok;
|
|
|
- [DupSub] ->
|
|
|
- lager:warning("PubSub: remove duplicated subscriber ~p", [DupSub]),
|
|
|
- ets:delete(topic_subscriber, DupSub)
|
|
|
- end
|
|
|
- end.
|
|
|
-
|
|
|
-try_remove_topic(Name) when is_binary(Name) ->
|
|
|
- case ets:member(topic_subscriber, Name) of
|
|
|
- false ->
|
|
|
- Topic = emqttd_topic:new(Name),
|
|
|
- Fun = fun() ->
|
|
|
- mnesia:delete_object(Topic),
|
|
|
- case mnesia:read(topic, Name) of
|
|
|
- [] -> trie_delete(Name);
|
|
|
- _ -> ignore
|
|
|
- end
|
|
|
- end,
|
|
|
- mnesia:transaction(Fun);
|
|
|
- true ->
|
|
|
- ok
|
|
|
- end.
|
|
|
+%%try_remove_topic(Name) when is_binary(Name) ->
|
|
|
+%% case ets:member(topic_subscriber, Name) of
|
|
|
+%% false ->
|
|
|
+%% Topic = emqttd_topic:new(Name),
|
|
|
+%% Fun = fun() ->
|
|
|
+%% mnesia:delete_object(Topic),
|
|
|
+%% case mnesia:read(topic, Name) of
|
|
|
+%% [] -> trie_delete(Name);
|
|
|
+%% _ -> ignore
|
|
|
+%% end
|
|
|
+%% end,
|
|
|
+%% mnesia:transaction(Fun);
|
|
|
+%% true ->
|
|
|
+%% ok
|
|
|
+%% end.
|
|
|
|
|
|
trie_add(Topic) when is_binary(Topic) ->
|
|
|
mnesia:write(emqttd_topic:new(Topic)),
|
|
|
@@ -325,7 +327,7 @@ trie_add(Topic) when is_binary(Topic) ->
|
|
|
[TrieNode=#topic_trie_node{topic=undefined}] ->
|
|
|
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
|
|
[#topic_trie_node{topic=Topic}] ->
|
|
|
- {atomic, already_exist};
|
|
|
+ ok;
|
|
|
[] ->
|
|
|
%add trie path
|
|
|
[trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
|
|
@@ -401,7 +403,7 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
|
|
|
|
|
setstats(State = #state{max_subs = Max}) ->
|
|
|
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)),
|
|
|
- SubCount = ets:info(topic_subscriber, size),
|
|
|
+ SubCount = mnesia:table_info(topic_subscriber, size),
|
|
|
emqttd_broker:setstat('subscribers/count', SubCount),
|
|
|
if
|
|
|
SubCount > Max ->
|