emqtt_topic.erl 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. -module(emqtt_topic).
  2. -include("emqtt.hrl").
  3. -export([start_link/0,
  4. match/1,
  5. insert/1,
  6. delete/1]).
  7. -behaviour(gen_server).
  8. -export([init/1,
  9. handle_call/3,
  10. handle_cast/2,
  11. handle_info/2,
  12. terminate/2,
  13. code_change/3]).
  14. -record(state, {}).
  15. start_link() ->
  16. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  17. match(Topic) when is_binary(Topic) ->
  18. Words = topic_split(Topic),
  19. DirectMatches = mnesia:dirty_read(direct_topic, Words),
  20. WildcardMatches = lists:append([
  21. mnesia:dirty_read(wildcard_topic, Key) ||
  22. Key <- mnesia:dirty_all_keys(wildcard_topic),
  23. topic_match(Words, Key)
  24. ]),
  25. DirectMatches ++ WildcardMatches.
  26. insert(Topic) when is_binary(Topic) ->
  27. gen_server:call(?MODULE, {insert, Topic}).
  28. delete(Topic) when is_binary(Topic) ->
  29. gen_server:cast(?MODULE, {delete, Topic}).
  30. init([]) ->
  31. {atomic, ok} = mnesia:create_table(
  32. direct_topic, [
  33. {record_name, topic},
  34. {ram_copies, [node()]},
  35. {attributes, record_info(fields, topic)}]),
  36. {atomic, ok} = mnesia:create_table(
  37. wildcard_topic, [
  38. {record_name, topic},
  39. {ram_copies, [node()]},
  40. {attributes, record_info(fields, topic)}]),
  41. error_logger:info_msg("emqtt_topic is started."),
  42. {ok, #state{}}.
  43. handle_call({insert, Topic}, _From, State) ->
  44. Words = topic_split(Topic),
  45. Reply =
  46. case topic_type(Words) of
  47. direct ->
  48. mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic});
  49. wildcard ->
  50. mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic})
  51. end,
  52. {reply, Reply, State};
  53. handle_call(Req, _From, State) ->
  54. {stop, {badreq, Req}, State}.
  55. handle_cast({delete, Topic}, State) ->
  56. Words = topic_split(Topic),
  57. case topic_type(Words) of
  58. direct ->
  59. mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic});
  60. wildcard ->
  61. mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic})
  62. end,
  63. {noreply, State};
  64. handle_cast(Msg, State) ->
  65. {stop, {badmsg, Msg}, State}.
  66. handle_info(Info, State) ->
  67. {stop, {badinfo, Info}, State}.
  68. terminate(_Reason, _State) ->
  69. ok.
  70. code_change(_OldVsn, _State, _Extra) ->
  71. ok.
  72. topic_type([]) ->
  73. direct;
  74. topic_type([<<"#">>]) ->
  75. wildcard;
  76. topic_type([<<"+">>|_T]) ->
  77. wildcard;
  78. topic_type([_|T]) ->
  79. topic_type(T).
  80. topic_match([], []) ->
  81. true;
  82. topic_match([H|T1], [H|T2]) ->
  83. topic_match(T1, T2);
  84. topic_match([_H|T1], [<<"+">>|T2]) ->
  85. topic_match(T1, T2);
  86. topic_match(_, [<<"#">>]) ->
  87. true;
  88. topic_match([], [_H|_T2]) ->
  89. false.
  90. topic_split(S) ->
  91. binary:split(S, [<<"/">>], [global]).