emqx_topic.erl 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_topic).
  17. %% APIs
  18. -export([ match/2
  19. , validate/1
  20. , validate/2
  21. , levels/1
  22. , tokens/1
  23. , words/1
  24. , wildcard/1
  25. , join/1
  26. , prepend/2
  27. , feed_var/3
  28. , systop/1
  29. , parse/1
  30. , parse/2
  31. ]).
  32. -export_type([ group/0
  33. , topic/0
  34. , word/0
  35. ]).
  36. -type(group() :: binary()).
  37. -type(topic() :: binary()).
  38. -type(word() :: '' | '+' | '#' | binary()).
  39. -type(words() :: list(word())).
  40. -define(MAX_TOPIC_LEN, 65535).
  41. %%--------------------------------------------------------------------
  42. %% APIs
  43. %%--------------------------------------------------------------------
  44. %% @doc Is wildcard topic?
  45. -spec(wildcard(topic() | words()) -> true | false).
  46. wildcard(Topic) when is_binary(Topic) ->
  47. wildcard(words(Topic));
  48. wildcard([]) ->
  49. false;
  50. wildcard(['#'|_]) ->
  51. true;
  52. wildcard(['+'|_]) ->
  53. true;
  54. wildcard([_H|T]) ->
  55. wildcard(T).
  56. %% @doc Match Topic name with filter.
  57. -spec(match(Name, Filter) -> boolean() when
  58. Name :: topic() | words(),
  59. Filter :: topic() | words()).
  60. match(<<$$, _/binary>>, <<$+, _/binary>>) ->
  61. false;
  62. match(<<$$, _/binary>>, <<$#, _/binary>>) ->
  63. false;
  64. match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
  65. match(words(Name), words(Filter));
  66. match([], []) ->
  67. true;
  68. match([H|T1], [H|T2]) ->
  69. match(T1, T2);
  70. match([_H|T1], ['+'|T2]) ->
  71. match(T1, T2);
  72. match(_, ['#']) ->
  73. true;
  74. match([_H1|_], [_H2|_]) ->
  75. false;
  76. match([_H1|_], []) ->
  77. false;
  78. match([], [_H|_T2]) ->
  79. false.
  80. %% @doc Validate topic name or filter
  81. -spec(validate(topic() | {name | filter, topic()}) -> true).
  82. validate(Topic) when is_binary(Topic) ->
  83. validate(filter, Topic);
  84. validate({Type, Topic}) when Type =:= name; Type =:= filter ->
  85. validate(Type, Topic).
  86. -spec(validate(name | filter, topic()) -> true).
  87. validate(_, <<>>) ->
  88. error(empty_topic);
  89. validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
  90. error(topic_too_long);
  91. validate(filter, Topic) when is_binary(Topic) ->
  92. validate2(words(Topic));
  93. validate(name, Topic) when is_binary(Topic) ->
  94. Words = words(Topic),
  95. validate2(Words)
  96. andalso (not wildcard(Words))
  97. orelse error(topic_name_error).
  98. validate2([]) ->
  99. true;
  100. validate2(['#']) -> % end with '#'
  101. true;
  102. validate2(['#'|Words]) when length(Words) > 0 ->
  103. error('topic_invalid_#');
  104. validate2([''|Words]) ->
  105. validate2(Words);
  106. validate2(['+'|Words]) ->
  107. validate2(Words);
  108. validate2([W|Words]) ->
  109. validate3(W) andalso validate2(Words).
  110. validate3(<<>>) ->
  111. true;
  112. validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
  113. error('topic_invalid_char');
  114. validate3(<<_/utf8, Rest/binary>>) ->
  115. validate3(Rest).
  116. %% @doc Prepend a topic prefix.
  117. %% Ensured to have only one / between prefix and suffix.
  118. prepend(undefined, W) -> bin(W);
  119. prepend(<<>>, W) -> bin(W);
  120. prepend(Parent0, W) ->
  121. Parent = bin(Parent0),
  122. case binary:last(Parent) of
  123. $/ -> <<Parent/binary, (bin(W))/binary>>;
  124. _ -> <<Parent/binary, $/, (bin(W))/binary>>
  125. end.
  126. bin('') -> <<>>;
  127. bin('+') -> <<"+">>;
  128. bin('#') -> <<"#">>;
  129. bin(B) when is_binary(B) -> B;
  130. bin(L) when is_list(L) -> list_to_binary(L).
  131. -spec(levels(topic()) -> pos_integer()).
  132. levels(Topic) when is_binary(Topic) ->
  133. length(tokens(Topic)).
  134. -compile({inline, [tokens/1]}).
  135. %% @doc Split topic to tokens.
  136. -spec(tokens(topic()) -> list(binary())).
  137. tokens(Topic) ->
  138. binary:split(Topic, <<"/">>, [global]).
  139. %% @doc Split Topic Path to Words
  140. -spec(words(topic()) -> words()).
  141. words(Topic) when is_binary(Topic) ->
  142. [word(W) || W <- tokens(Topic)].
  143. word(<<>>) -> '';
  144. word(<<"+">>) -> '+';
  145. word(<<"#">>) -> '#';
  146. word(Bin) -> Bin.
  147. %% @doc '$SYS' Topic.
  148. -spec(systop(atom()|string()|binary()) -> topic()).
  149. systop(Name) when is_atom(Name); is_list(Name) ->
  150. iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
  151. systop(Name) when is_binary(Name) ->
  152. iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/", Name]).
  153. -spec(feed_var(binary(), binary(), binary()) -> binary()).
  154. feed_var(Var, Val, Topic) ->
  155. feed_var(Var, Val, words(Topic), []).
  156. feed_var(_Var, _Val, [], Acc) ->
  157. join(lists:reverse(Acc));
  158. feed_var(Var, Val, [Var|Words], Acc) ->
  159. feed_var(Var, Val, Words, [Val|Acc]);
  160. feed_var(Var, Val, [W|Words], Acc) ->
  161. feed_var(Var, Val, Words, [W|Acc]).
  162. -spec(join(list(binary())) -> binary()).
  163. join([]) ->
  164. <<>>;
  165. join([W]) ->
  166. bin(W);
  167. join(Words) ->
  168. {_, Bin} = lists:foldr(
  169. fun(W, {true, Tail}) ->
  170. {false, <<W/binary, Tail/binary>>};
  171. (W, {false, Tail}) ->
  172. {false, <<W/binary, "/", Tail/binary>>}
  173. end, {true, <<>>}, [bin(W) || W <- Words]),
  174. Bin.
  175. -spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
  176. parse(TopicFilter) when is_binary(TopicFilter) ->
  177. parse(TopicFilter, #{});
  178. parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
  179. parse(TopicFilter, Options).
  180. -spec(parse(topic(), map()) -> {topic(), map()}).
  181. parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
  182. error({invalid_topic_filter, TopicFilter});
  183. parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
  184. error({invalid_topic_filter, TopicFilter});
  185. parse(<<"$queue/", TopicFilter/binary>>, Options) ->
  186. parse(TopicFilter, Options#{share => <<"$queue">>});
  187. parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
  188. case binary:split(Rest, <<"/">>) of
  189. [_Any] -> error({invalid_topic_filter, TopicFilter});
  190. [ShareName, Filter] ->
  191. case binary:match(ShareName, [<<"+">>, <<"#">>]) of
  192. nomatch -> parse(Filter, Options#{share => ShareName});
  193. _ -> error({invalid_topic_filter, TopicFilter})
  194. end
  195. end;
  196. parse(TopicFilter, Options) ->
  197. {TopicFilter, Options}.