emqttd_mod_rewrite.erl 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc emqttd rewrite module
  17. -module(emqttd_mod_rewrite).
  18. -behaviour(emqttd_gen_mod).
  19. -include("emqttd.hrl").
  20. -export([load/1, reload/1, unload/1]).
  21. -export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]).
  22. %%--------------------------------------------------------------------
  23. %% API
  24. %%--------------------------------------------------------------------
  25. load(Opts) ->
  26. File = proplists:get_value(file, Opts),
  27. {ok, Terms} = file:consult(File),
  28. Sections = compile(Terms),
  29. emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Sections]),
  30. emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Sections]),
  31. emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]).
  32. rewrite_subscribe(_ClientId, TopicTable, Sections) ->
  33. lager:info("Rewrite subscribe: ~p", [TopicTable]),
  34. {ok, [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]}.
  35. rewrite_unsubscribe(_ClientId, Topics, Sections) ->
  36. lager:info("Rewrite unsubscribe: ~p", [Topics]),
  37. {ok, [match_topic(Topic, Sections) || Topic <- Topics]}.
  38. rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
  39. %%TODO: this will not work if the client is always online.
  40. RewriteTopic =
  41. case get({rewrite, Topic}) of
  42. undefined ->
  43. DestTopic = match_topic(Topic, Sections),
  44. put({rewrite, Topic}, DestTopic), DestTopic;
  45. DestTopic ->
  46. DestTopic
  47. end,
  48. {ok, Message#mqtt_message{topic = RewriteTopic}}.
  49. reload(File) ->
  50. %%TODO: The unload api is not right...
  51. case emqttd_app:is_mod_enabled(rewrite) of
  52. true ->
  53. unload(state),
  54. load([{file, File}]);
  55. false ->
  56. {error, module_unloaded}
  57. end.
  58. unload(_) ->
  59. emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/3),
  60. emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3),
  61. emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2).
  62. %%--------------------------------------------------------------------
  63. %% Internal functions
  64. %%--------------------------------------------------------------------
  65. compile(Sections) ->
  66. C = fun({rewrite, Re, Dest}) ->
  67. {ok, MP} = re:compile(Re),
  68. {rewrite, MP, Dest}
  69. end,
  70. F = fun({topic, Topic, Rules}) ->
  71. {topic, list_to_binary(Topic), [C(R) || R <- Rules]}
  72. end,
  73. [F(Section) || Section <- Sections].
  74. match_topic(Topic, []) ->
  75. Topic;
  76. match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
  77. case emqttd_topic:match(Topic, Filter) of
  78. true ->
  79. match_rule(Topic, Rules);
  80. false ->
  81. match_topic(Topic, Sections)
  82. end.
  83. match_rule(Topic, []) ->
  84. Topic;
  85. match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
  86. case re:run(Topic, MP, [{capture, all_but_first, list}]) of
  87. {match, Captured} ->
  88. Vars = lists:zip(["\\$" ++ integer_to_list(I)
  89. || I <- lists:seq(1, length(Captured))], Captured),
  90. iolist_to_binary(lists:foldl(
  91. fun({Var, Val}, Acc) ->
  92. re:replace(Acc, Var, Val, [global])
  93. end, Dest, Vars));
  94. nomatch ->
  95. match_rule(Topic, Rules)
  96. end.