|
|
@@ -43,7 +43,8 @@
|
|
|
%% API Exports
|
|
|
-export([start_link/3]).
|
|
|
|
|
|
--export([create/1, subscribe/1, subscribe/2, unsubscribe/1, publish/1]).
|
|
|
+-export([create/1, subscribe/1, subscribe/2,
|
|
|
+ unsubscribe/1, unsubscribe/2, publish/1]).
|
|
|
|
|
|
%% Local node
|
|
|
-export([match/1]).
|
|
|
@@ -52,25 +53,35 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
+-ifdef(TEST).
|
|
|
+-compile(export_all).
|
|
|
+-endif.
|
|
|
+
|
|
|
-record(state, {pool, id}).
|
|
|
|
|
|
-define(ROUTER, emqttd_router).
|
|
|
|
|
|
+-define(HELPER, emqttd_pubsub_helper).
|
|
|
+
|
|
|
%%%=============================================================================
|
|
|
%%% Mnesia callbacks
|
|
|
%%%=============================================================================
|
|
|
|
|
|
mnesia(boot) ->
|
|
|
- %% topic table
|
|
|
+ %% Topic Table
|
|
|
ok = emqttd_mnesia:create_table(topic, [
|
|
|
{type, bag},
|
|
|
{ram_copies, [node()]},
|
|
|
{record_name, mqtt_topic},
|
|
|
{attributes, record_info(fields, mqtt_topic)}]),
|
|
|
- %% subscription table
|
|
|
+ RamOrDisc = case env(subscription) of
|
|
|
+ disc -> disc_copies;
|
|
|
+ _ -> ram_copies
|
|
|
+ end,
|
|
|
+ %% Subscription Table
|
|
|
ok = emqttd_mnesia:create_table(subscription, [
|
|
|
{type, bag},
|
|
|
- {ram_copies, [node()]},
|
|
|
+ {RamOrDisc, [node()]},
|
|
|
{record_name, mqtt_subscription},
|
|
|
{attributes, record_info(fields, mqtt_subscription)}]);
|
|
|
|
|
|
@@ -78,6 +89,9 @@ mnesia(copy) ->
|
|
|
ok = emqttd_mnesia:copy_table(topic),
|
|
|
ok = emqttd_mnesia:copy_table(subscription).
|
|
|
|
|
|
+env(Key) ->
|
|
|
+ proplists:get_value(Key, emqttd_broker:env(pubsub)).
|
|
|
+
|
|
|
%%%=============================================================================
|
|
|
%%% API
|
|
|
%%%=============================================================================
|
|
|
@@ -97,42 +111,46 @@ name(Id) ->
|
|
|
list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-%% @doc Create topic. Notice That this transaction is not protected by pubsub pool
|
|
|
+%% @doc Create Topic.
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec create(Topic :: binary()) -> ok | {error, Error :: any()}.
|
|
|
create(Topic) when is_binary(Topic) ->
|
|
|
- case mnesia:transaction(fun add_topic/1, [#mqtt_topic{topic = Topic, node = node()}]) of
|
|
|
- {atomic, ok} -> setstats(topics), ok;
|
|
|
+ Record = #mqtt_topic{topic = Topic, node = node()},
|
|
|
+ case mnesia:transaction(fun add_topic/1, [Record]) of
|
|
|
+ {atomic, ok} -> ok;
|
|
|
{aborted, Error} -> {error, Error}
|
|
|
end.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-%% @doc Subscribe Topic
|
|
|
+%% @doc Subscribe Topics
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec subscribe(Topic, Qos) -> {ok, Qos} when
|
|
|
- Topic :: binary(),
|
|
|
- Qos :: mqtt_qos() | mqtt_qos_name().
|
|
|
-subscribe(Topic, Qos) ->
|
|
|
- %%TODO:...
|
|
|
- subscribe([{Topic, Qos}]).
|
|
|
-
|
|
|
-spec subscribe({Topic, Qos} | list({Topic, Qos})) ->
|
|
|
{ok, Qos | list(Qos)} | {error, any()} when
|
|
|
Topic :: binary(),
|
|
|
Qos :: mqtt_qos() | mqtt_qos_name().
|
|
|
-subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) ->
|
|
|
- %%TODO:...
|
|
|
+subscribe({Topic, Qos}) ->
|
|
|
subscribe([{Topic, Qos}]);
|
|
|
+subscribe(TopicTable) when is_list(TopicTable) ->
|
|
|
+ call({subscribe, {undefined, self()}, fixqos(TopicTable)}).
|
|
|
+
|
|
|
+-spec subscribe(ClientId, {Topic, Qos} | list({Topic, Qos})) ->
|
|
|
+ {ok, Qos | list(Qos)} | {error, any()} when
|
|
|
+ ClientId :: binary(),
|
|
|
+ Topic :: binary(),
|
|
|
+ Qos :: mqtt_qos() | mqtt_qos_name().
|
|
|
+subscribe(ClientId, {Topic, Qos}) when is_binary(ClientId) ->
|
|
|
+ subscribe(ClientId, [{Topic, Qos}]);
|
|
|
+subscribe(ClientId, TopicTable) when is_binary(ClientId) andalso is_list(TopicTable) ->
|
|
|
+ call({subscribe, {ClientId, self()}, fixqos(TopicTable)}).
|
|
|
+
|
|
|
+fixqos(TopicTable) ->
|
|
|
+ [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable].
|
|
|
|
|
|
-subscribe(TopicTable0 = [{_Topic, _Qos} | _]) ->
|
|
|
- Self = self(),
|
|
|
- TopicTable = [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- TopicTable0],
|
|
|
- ?ROUTER:add_routes(TopicTable, Self),
|
|
|
- PubSub = gproc_pool:pick_worker(pubsub, Self),
|
|
|
- SubReq = {subscribe, Self, TopicTable},
|
|
|
- gen_server2:call(PubSub, SubReq, infinity).
|
|
|
+call(Request) ->
|
|
|
+ PubSub = gproc_pool:pick_worker(pubsub, self()),
|
|
|
+ gen_server2:call(PubSub, Request, infinity).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc Unsubscribe Topic or Topics
|
|
|
@@ -141,12 +159,18 @@ subscribe(TopicTable0 = [{_Topic, _Qos} | _]) ->
|
|
|
-spec unsubscribe(binary() | list(binary())) -> ok.
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
unsubscribe([Topic]);
|
|
|
-
|
|
|
unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
|
|
|
- Self = self(),
|
|
|
- ?ROUTER:delete_routes(Topics, Self),
|
|
|
- PubSub = gproc_pool:pick_worker(pubsub, Self),
|
|
|
- gen_server2:cast(PubSub, {unsubscribe, Self, Topics}).
|
|
|
+ cast({unsubscribe, {undefined, self()}, Topics}).
|
|
|
+
|
|
|
+-spec unsubscribe(binary(), binary() | list(binary())) -> ok.
|
|
|
+unsubscribe(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
|
|
|
+ unsubscribe(ClientId, [Topic]);
|
|
|
+unsubscribe(ClientId, Topics = [Topic|_]) when is_binary(Topic) ->
|
|
|
+ cast({unsubscribe, {ClientId, self()}, Topics}).
|
|
|
+
|
|
|
+cast(Msg) ->
|
|
|
+ PubSub = gproc_pool:pick_worker(pubsub, self()),
|
|
|
+ gen_server2:cast(PubSub, Msg).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc Publish to cluster nodes
|
|
|
@@ -169,10 +193,13 @@ publish(Msg = #mqtt_message{from = From}) ->
|
|
|
|
|
|
publish(Topic, Msg) when is_binary(Topic) ->
|
|
|
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
|
|
|
- rpc:cast(Node, ?ROUTER, route, [Name, Msg])
|
|
|
- end, match(Topic)).
|
|
|
+ rpc:cast(Node, ?ROUTER, route, [Name, Msg])
|
|
|
+ end, match(Topic)).
|
|
|
|
|
|
-%%TODO: Benchmark and refactor...
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% @doc Match Topic Name with Topic Filters
|
|
|
+%% @end
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
-spec match(Topic :: binary()) -> [mqtt_topic()].
|
|
|
match(Topic) when is_binary(Topic) ->
|
|
|
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
|
|
|
@@ -182,16 +209,29 @@ match(Topic) when is_binary(Topic) ->
|
|
|
%%% gen_server callbacks
|
|
|
%%%=============================================================================
|
|
|
|
|
|
-init([Pool, Id, _Opts]) ->
|
|
|
+init([Pool, Id, Opts]) ->
|
|
|
+ ?ROUTER:init(Opts),
|
|
|
?GPROC_POOL(join, Pool, Id),
|
|
|
+ process_flag(priority, high),
|
|
|
{ok, #state{pool = Pool, id = Id}}.
|
|
|
|
|
|
-%%TODO: clientId???
|
|
|
-handle_call({subscribe, _SubPid, TopicTable}, _From, State) ->
|
|
|
- Records = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable],
|
|
|
- case mnesia:transaction(fun() -> [add_topic(Record) || Record <- Records] end) of
|
|
|
- {atomic, _Result} ->
|
|
|
- {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, setstats(State)};
|
|
|
+handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State) ->
|
|
|
+ %% Clean aging topics
|
|
|
+ ?HELPER:clean([Topic || {Topic, _Qos} <- TopicTable]),
|
|
|
+
|
|
|
+ %% Add routes first
|
|
|
+ ?ROUTER:add_routes(TopicTable, SubPid),
|
|
|
+
|
|
|
+ %% Add topics
|
|
|
+ Node = node(),
|
|
|
+ TRecords = [#mqtt_topic{topic = Topic, node = Node} || {Topic, _Qos} <- TopicTable],
|
|
|
+
|
|
|
+ %% Add subscriptions
|
|
|
+ case mnesia:transaction(fun add_topics/1, [TRecords]) of
|
|
|
+ {atomic, _} ->
|
|
|
+ %% store subscription
|
|
|
+ %% mnesia:async_dirty(fun add_subscriptions/2, [SubId, TopicTable]),
|
|
|
+ {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
|
|
|
{aborted, Error} ->
|
|
|
{reply, {error, Error}, State}
|
|
|
end;
|
|
|
@@ -200,20 +240,29 @@ handle_call(Req, _From, State) ->
|
|
|
lager:error("Bad Request: ~p", [Req]),
|
|
|
{reply, {error, badreq}, State}.
|
|
|
|
|
|
-%%TODO: clientId???
|
|
|
-handle_cast({unsubscribe, SubPid, Topics}, State) when is_list(Topics) ->
|
|
|
+handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State) ->
|
|
|
+ %% Delete routes first
|
|
|
+ ?ROUTER:delete_routes(Topics, SubPid),
|
|
|
+
|
|
|
+ %% Remove subscriptions
|
|
|
+ mnesia:async_dirty(fun remove_subscriptions/2, [SubId, Topics]),
|
|
|
+
|
|
|
{noreply, State};
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
lager:error("Bad Msg: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
+handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
|
|
+ ?ROUTER:delete_routes(DownPid),
|
|
|
+ {noreply, State, hibernate};
|
|
|
+
|
|
|
handle_info(Info, State) ->
|
|
|
lager:error("Unexpected Info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
|
|
- ?GPROC_POOL(leave, Pool, Id), setstats(all).
|
|
|
+ ?GPROC_POOL(leave, Pool, Id).
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
@@ -222,6 +271,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%%% Internal functions
|
|
|
%%%=============================================================================
|
|
|
|
|
|
+add_topics(Records) ->
|
|
|
+ lists:foreach(fun add_topic/1, Records).
|
|
|
+
|
|
|
add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
|
|
case mnesia:wread({topic, Topic}) of
|
|
|
[] ->
|
|
|
@@ -234,35 +286,33 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
|
|
|
- case mnesia:read({subscriber, Topic}) of
|
|
|
- [] ->
|
|
|
- mnesia:delete_object(topic, TopicR, write),
|
|
|
- case mnesia:read(topic, Topic) of
|
|
|
- [] -> emqttd_trie:delete(Topic);
|
|
|
- _ -> ok
|
|
|
- end;
|
|
|
- _ ->
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
-%%%=============================================================================
|
|
|
-%%% Stats functions
|
|
|
-%%%=============================================================================
|
|
|
-
|
|
|
-setstats(State) ->
|
|
|
- emqttd_stats:setstats('topics/count', 'topics/max',
|
|
|
- mnesia:table_info(topic, size)), State.
|
|
|
+add_subscriptions(undefined, _TopicTable) ->
|
|
|
+ ok;
|
|
|
+add_subscriptions(SubId, TopicTable) ->
|
|
|
+ lists:foreach(fun({Topic, Qos}) ->
|
|
|
+ %%TODO: this is not right...
|
|
|
+ Subscription = #mqtt_subscription{subid = SubId, topic = Topic, qos = Qos},
|
|
|
+ mnesia:write(subscription, Subscription, write)
|
|
|
+ end,TopicTable).
|
|
|
+
|
|
|
+remove_subscriptions(undefined, _Topics) ->
|
|
|
+ ok;
|
|
|
+remove_subscriptions(SubId, Topics) ->
|
|
|
+ lists:foreach(fun(Topic) ->
|
|
|
+ Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
|
|
|
+ [mnesia:delete_object(subscription, Subscription, write)
|
|
|
+ || Subscription <- mnesia:match_object(subscription, Pattern, write)]
|
|
|
+ end, Topics).
|
|
|
|
|
|
%%%=============================================================================
|
|
|
-%%% Trace functions
|
|
|
+%%% Trace Functions
|
|
|
%%%=============================================================================
|
|
|
|
|
|
trace(publish, From, _Msg) when is_atom(From) ->
|
|
|
- %%dont' trace broker publish
|
|
|
+ %% Dont' trace broker publish
|
|
|
ignore;
|
|
|
|
|
|
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
|
|
lager:info([{client, From}, {topic, Topic}],
|
|
|
- "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
|
|
+ "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
|
|
|