|
@@ -32,20 +32,16 @@
|
|
|
|
|
|
|
|
-include("emqttd.hrl").
|
|
-include("emqttd.hrl").
|
|
|
|
|
|
|
|
--behaviour(gen_server).
|
|
|
|
|
-
|
|
|
|
|
--define(SERVER, ?MODULE).
|
|
|
|
|
-
|
|
|
|
|
--define(SUBACK_ERR, 128).
|
|
|
|
|
-
|
|
|
|
|
%% Mnesia Callbacks
|
|
%% Mnesia Callbacks
|
|
|
-export([mnesia/1]).
|
|
-export([mnesia/1]).
|
|
|
|
|
|
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
-boot_mnesia({mnesia, [boot]}).
|
|
|
-copy_mnesia({mnesia, [copy]}).
|
|
-copy_mnesia({mnesia, [copy]}).
|
|
|
|
|
|
|
|
|
|
+-behaviour(gen_server).
|
|
|
|
|
+
|
|
|
%% API Exports
|
|
%% API Exports
|
|
|
--export([start_link/0]).
|
|
|
|
|
|
|
+-export([start_link/0, name/1]).
|
|
|
|
|
|
|
|
-export([create/1,
|
|
-export([create/1,
|
|
|
subscribe/1, subscribe/2,
|
|
subscribe/1, subscribe/2,
|
|
@@ -58,6 +54,8 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
terminate/2, code_change/3]).
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
|
|
|
|
+-define(SUBACK_ERR, 128).
|
|
|
|
|
+
|
|
|
-record(state, {submap :: map()}).
|
|
-record(state, {submap :: map()}).
|
|
|
|
|
|
|
|
%%%=============================================================================
|
|
%%%=============================================================================
|
|
@@ -76,7 +74,7 @@ mnesia(boot) ->
|
|
|
{ram_copies, [node()]},
|
|
{ram_copies, [node()]},
|
|
|
{record_name, mqtt_subscriber},
|
|
{record_name, mqtt_subscriber},
|
|
|
{attributes, record_info(fields, mqtt_subscriber)},
|
|
{attributes, record_info(fields, mqtt_subscriber)},
|
|
|
- {index, [subpid]},
|
|
|
|
|
|
|
+ {index, [pid]},
|
|
|
{local_content, true}]);
|
|
{local_content, true}]);
|
|
|
|
|
|
|
|
mnesia(copy) ->
|
|
mnesia(copy) ->
|
|
@@ -85,7 +83,9 @@ mnesia(copy) ->
|
|
|
|
|
|
|
|
%%%=============================================================================
|
|
%%%=============================================================================
|
|
|
%%% API
|
|
%%% API
|
|
|
|
|
+%%%
|
|
|
%%%=============================================================================
|
|
%%%=============================================================================
|
|
|
|
|
+%%%
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
%% @doc
|
|
@@ -93,9 +93,12 @@ mnesia(copy) ->
|
|
|
%%
|
|
%%
|
|
|
%% @end
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
|
|
|
|
-start_link() ->
|
|
|
|
|
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
|
|
|
|
|
+-spec start_link(Opts) -> {ok, pid()} | ignore | {error, any()}.
|
|
|
|
|
+start_link(Opts) ->
|
|
|
|
|
+ gen_server:start_link(?MODULE, [], []).
|
|
|
|
|
+
|
|
|
|
|
+name(I) ->
|
|
|
|
|
+ list_to_atom("emqttd_pubsub_" ++ integer_to_list(I)).
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
%% @doc
|
|
@@ -103,47 +106,41 @@ start_link() ->
|
|
|
%%
|
|
%%
|
|
|
%% @end
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec create(binary()) -> ok.
|
|
|
|
|
|
|
+-spec create(binary()) -> {atomic, ok} | {aborted, Reason :: any()}.
|
|
|
create(Topic) when is_binary(Topic) ->
|
|
create(Topic) when is_binary(Topic) ->
|
|
|
- Record = #mqtt_topic{topic = Topic, node = node()},
|
|
|
|
|
- {atomic, ok} = mnesia:transaction(fun insert_topic/1, [Record]), ok.
|
|
|
|
|
|
|
+ TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
|
|
|
|
+ Result = mnesia:transaction(fun create_topic/1, [TopicRecord]),
|
|
|
|
|
+ setstats(topics), Result.
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
%% @doc
|
|
|
-%% Subscribe topics
|
|
|
|
|
|
|
+%% Subscribe topic or topics.
|
|
|
%%
|
|
%%
|
|
|
%% @end
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when
|
|
|
|
|
|
|
+-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} when
|
|
|
Topic :: binary(),
|
|
Topic :: binary(),
|
|
|
Qos :: mqtt_qos().
|
|
Qos :: mqtt_qos().
|
|
|
subscribe(Topics = [{_Topic, _Qos} | _]) ->
|
|
subscribe(Topics = [{_Topic, _Qos} | _]) ->
|
|
|
{ok, lists:map(fun({Topic, Qos}) ->
|
|
{ok, lists:map(fun({Topic, Qos}) ->
|
|
|
case subscribe(Topic, Qos) of
|
|
case subscribe(Topic, Qos) of
|
|
|
- {ok, GrantedQos} ->
|
|
|
|
|
|
|
+ {ok, GrantedQos} ->
|
|
|
GrantedQos;
|
|
GrantedQos;
|
|
|
- Error ->
|
|
|
|
|
- lager:error("Failed to subscribe '~s': ~p", [Topic, Error]),
|
|
|
|
|
|
|
+ {error, Error} ->
|
|
|
|
|
+ lager:error("subscribe '~s' error: ~p", [Topic, Error]),
|
|
|
?SUBACK_ERR
|
|
?SUBACK_ERR
|
|
|
end
|
|
end
|
|
|
end, Topics)}.
|
|
end, Topics)}.
|
|
|
|
|
|
|
|
--spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
|
|
|
|
|
|
|
+-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()} | {error, any()}.
|
|
|
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
|
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
|
|
- TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
|
|
|
|
- Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()},
|
|
|
|
|
- F = fun() ->
|
|
|
|
|
- case insert_topic(TopicRecord) of
|
|
|
|
|
- ok -> insert_subscriber(Subscriber);
|
|
|
|
|
- Error -> Error
|
|
|
|
|
- end
|
|
|
|
|
- end,
|
|
|
|
|
- case mnesia:transaction(F) of
|
|
|
|
|
|
|
+ case create(Topic) of
|
|
|
{atomic, ok} ->
|
|
{atomic, ok} ->
|
|
|
- {ok, Qos};
|
|
|
|
|
- {aborted, Reason} ->
|
|
|
|
|
- {error, Reason}
|
|
|
|
|
- end.
|
|
|
|
|
|
|
+ Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = self()},
|
|
|
|
|
+ ets:insert_new(?SUBSCRIBER_TAB, Subscriber),
|
|
|
|
|
+ {ok, Qos}; % Grant all qos
|
|
|
|
|
+ {aborted, Reason} ->
|
|
|
|
|
+ {error, Reason}.
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc
|
|
%% @doc
|
|
@@ -153,15 +150,17 @@ subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec unsubscribe(binary() | list(binary())) -> ok.
|
|
-spec unsubscribe(binary() | list(binary())) -> ok.
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
- SubPid = self(),
|
|
|
|
|
|
|
+ Pattern = #mqtt_subscriber{topic = Topic, _ = '_', pid = self()},
|
|
|
|
|
+ ets:match_delete(?SUBSCRIBER_TAB, Pattern),
|
|
|
|
|
+
|
|
|
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
|
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
|
|
|
F = fun() ->
|
|
F = fun() ->
|
|
|
%%TODO record name...
|
|
%%TODO record name...
|
|
|
- Pattern = #mqtt_subscriber{topic = Topic, _ = '_', subpid = SubPid},
|
|
|
|
|
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
|
|
[mnesia:delete_object(Sub) || Sub <- mnesia:match_object(Pattern)],
|
|
|
try_remove_topic(TopicRecord)
|
|
try_remove_topic(TopicRecord)
|
|
|
end,
|
|
end,
|
|
|
- {atomic, _} = mneisa:transaction(F), ok;
|
|
|
|
|
|
|
+ %{atomic, _} = mneisa:transaction(F),
|
|
|
|
|
+ ok;
|
|
|
|
|
|
|
|
unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
|
|
unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
|
|
|
lists:foreach(fun(T) -> unsubscribe(T) end, Topics).
|
|
lists:foreach(fun(T) -> unsubscribe(T) end, Topics).
|
|
@@ -193,7 +192,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
|
|
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
|
|
|
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
|
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
|
|
- case mnesia:dirty_read(subscriber, Topic) of
|
|
|
|
|
|
|
+ case ets:lookup:(?SUBSCRIBER_TAB, Topic) of
|
|
|
[] ->
|
|
[] ->
|
|
|
%%TODO: not right when clusted...
|
|
%%TODO: not right when clusted...
|
|
|
setstats(dropped);
|
|
setstats(dropped);
|
|
@@ -307,15 +306,19 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%%%=============================================================================
|
|
%%%=============================================================================
|
|
|
%%% Internal functions
|
|
%%% Internal functions
|
|
|
%%%=============================================================================
|
|
%%%=============================================================================
|
|
|
-insert_topic(Record = #mqtt_topic{topic = Topic}) ->
|
|
|
|
|
|
|
+
|
|
|
|
|
+-spec create_topic(#mqtt_topic{}) -> {atomic, ok} | {aborted, any()}.
|
|
|
|
|
+create_topic(TopicRecord = #mqtt_topic{topic = Topic}) ->
|
|
|
case mnesia:wread({topic, Topic}) of
|
|
case mnesia:wread({topic, Topic}) of
|
|
|
[] ->
|
|
[] ->
|
|
|
ok = emqttd_trie:insert(Topic),
|
|
ok = emqttd_trie:insert(Topic),
|
|
|
- mnesia:write(topic, Record, write);
|
|
|
|
|
|
|
+ mnesia:write(topic, TopicRecord, write);
|
|
|
Records ->
|
|
Records ->
|
|
|
- case lists:member(Record, Records) of
|
|
|
|
|
- true -> ok;
|
|
|
|
|
- false -> mnesia:write(topic, Record, write)
|
|
|
|
|
|
|
+ case lists:member(TopicRecord, Records) of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ false ->
|
|
|
|
|
+ mnesia:write(topic, TopicRecord, write)
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|