emqx_lua_script.erl 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_lua_script).
  17. -include("emqx_lua_hook.hrl").
  18. -include_lib("emqx_libs/include/emqx.hrl").
  19. -include_lib("emqx_libs/include/emqx_mqtt.hrl").
  20. -export([ register_on_message_publish/2
  21. , register_on_client_connected/2
  22. , register_on_client_disconnected/2
  23. , register_on_client_subscribe/2
  24. , register_on_client_unsubscribe/2
  25. , register_on_message_acked/2
  26. , register_on_message_delivered/2
  27. , register_on_session_subscribed/2
  28. , register_on_session_unsubscribed/2
  29. , register_on_client_authenticate/2
  30. , register_on_client_check_acl/2
  31. , unregister_hooks/1
  32. ]).
  33. -export([ on_client_connected/4
  34. , on_client_disconnected/5
  35. , on_client_authenticate/4
  36. , on_client_check_acl/6
  37. , on_client_subscribe/5
  38. , on_client_unsubscribe/5
  39. , on_session_subscribed/5
  40. , on_session_unsubscribed/5
  41. , on_message_publish/3
  42. , on_message_delivered/4
  43. , on_message_acked/4
  44. ]).
  45. -define(EMPTY_USERNAME, <<"">>).
  46. -define(HOOK_ADD(A, B), emqx:hook(A, B)).
  47. -define(HOOK_DEL(A, B), emqx:unhook(A, B)).
  48. register_on_client_connected(ScriptName, LuaState) ->
  49. ?HOOK_ADD('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}).
  50. register_on_client_disconnected(ScriptName, LuaState) ->
  51. ?HOOK_ADD('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}).
  52. register_on_client_authenticate(ScriptName, LuaState) ->
  53. ?HOOK_ADD('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}).
  54. register_on_client_check_acl(ScriptName, LuaState) ->
  55. ?HOOK_ADD('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}).
  56. register_on_client_subscribe(ScriptName, LuaState) ->
  57. ?HOOK_ADD('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}).
  58. register_on_client_unsubscribe(ScriptName, LuaState) ->
  59. ?HOOK_ADD('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}).
  60. register_on_session_subscribed(ScriptName, LuaState) ->
  61. ?HOOK_ADD('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}).
  62. register_on_session_unsubscribed(ScriptName, LuaState) ->
  63. ?HOOK_ADD('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}).
  64. register_on_message_publish(ScriptName, LuaState) ->
  65. ?HOOK_ADD('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}).
  66. register_on_message_delivered(ScriptName, LuaState) ->
  67. ?HOOK_ADD('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}).
  68. register_on_message_acked(ScriptName, LuaState) ->
  69. ?HOOK_ADD('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
  70. unregister_hooks({ScriptName, LuaState}) ->
  71. ?HOOK_DEL('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}),
  72. ?HOOK_DEL('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}),
  73. ?HOOK_DEL('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}),
  74. ?HOOK_DEL('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}),
  75. ?HOOK_DEL('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}),
  76. ?HOOK_DEL('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}),
  77. ?HOOK_DEL('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}),
  78. ?HOOK_DEL('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}),
  79. ?HOOK_DEL('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}),
  80. ?HOOK_DEL('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}),
  81. ?HOOK_DEL('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
  82. on_client_connected(ClientInfo = #{clientid := ClientId, username := Username},
  83. ConnInfo, _ScriptName, LuaState) ->
  84. ?LOG(debug, "Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
  85. [ClientId, ClientInfo, ConnInfo]),
  86. case catch luerl:call_function([on_client_connected], [ClientId, Username], LuaState) of
  87. {'EXIT', St} ->
  88. ?LOG(error, "Failed to execute function on_client_connected(), which has syntax error, St=~p", [St]),
  89. ok;
  90. {_Result, _St} ->
  91. ok;
  92. Other ->
  93. ?LOG(error, "Lua function on_client_connected() caught exception, ~p", [Other]),
  94. ok
  95. end.
  96. on_client_disconnected(ClientInfo = #{clientid := ClientId, username := Username},
  97. ReasonCode, ConnInfo, _ScriptName, LuaState) ->
  98. ?LOG(debug, "Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
  99. [ClientId, ReasonCode, ClientInfo, ConnInfo]),
  100. case catch luerl:call_function([on_client_disconnected], [ClientId, Username, ReasonCode], LuaState) of
  101. {'EXIT', St} ->
  102. ?LOG(error, "Failed to execute function on_client_disconnected(), which has syntax error, St=~p", [St]),
  103. ok;
  104. {_Result, _St} ->
  105. ok;
  106. Other ->
  107. ?LOG(error, "Lua function on_client_disconnected() caught exception, ~p", [Other]),
  108. ok
  109. end.
  110. on_client_authenticate(#{clientid := ClientId,
  111. username := Username,
  112. peerhost := Peerhost,
  113. password := Password}, Result, _ScriptName, LuaState) ->
  114. case catch luerl:call_function([on_client_authenticate],
  115. [ClientId, Username, inet:ntoa(Peerhost), Password], LuaState) of
  116. {'EXIT', St} ->
  117. ?LOG(error, "Failed to execute function on_client_authenticate(), which has syntax error, St=~p", [St]),
  118. ok;
  119. {[<<"ignore">>], _St} ->
  120. ok;
  121. {[<<"ok">>], _St} ->
  122. {stop, Result#{auth_result => success}};
  123. Other ->
  124. ?LOG(error, "Lua function on_client_authenticate() caught exception, ~p", [Other]),
  125. ok
  126. end.
  127. on_client_check_acl(#{clientid := ClientId,
  128. username := Username,
  129. peerhost := Peerhost,
  130. password := Password}, Topic, PubSub, _Result, _ScriptName, LuaState) ->
  131. case catch luerl:call_function([on_client_check_acl], [ClientId, Username, inet:ntoa(Peerhost), Password, Topic, PubSub], LuaState) of
  132. {'EXIT', St} ->
  133. ?LOG(error, "Failed to execute function on_client_check_acl(), which has syntax error, St=~p", [St]),
  134. ok;
  135. {[<<"ignore">>],_St} ->
  136. ok;
  137. {[<<"allow">>], _St} ->
  138. {stop, allow};
  139. {[<<"deny">>], _St} ->
  140. {stop, deny};
  141. Other ->
  142. ?LOG(error, "Lua function on_client_check_acl() caught exception, ~p", [Other]),
  143. ok
  144. end.
  145. on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
  146. NewTopicFilters =
  147. lists:foldr(fun(TopicFilter, Acc) ->
  148. case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
  149. false -> Acc;
  150. NewTopicFilter -> [NewTopicFilter | Acc]
  151. end
  152. end, [], TopicFilters),
  153. case NewTopicFilters of
  154. [] -> stop;
  155. _ -> {ok, NewTopicFilters}
  156. end.
  157. on_client_subscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
  158. %% ignore topics starting with $
  159. TopicFilter;
  160. on_client_subscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
  161. ?LOG(debug, "hook client(~s/~s) will subscribe: ~p~n", [ClientId, Username, Topic]),
  162. case catch luerl:call_function([on_client_subscribe], [ClientId, Username, Topic], LuaState) of
  163. {'EXIT', St} ->
  164. ?LOG(error, "Failed to execute function on_client_subscribe(), which has syntax error, St=~p", [St]),
  165. TopicFilter;
  166. {[false], _St} ->
  167. false; % cancel this topic's subscription
  168. {[NewTopic], _St} ->
  169. ?LOG(debug, "LUA function on_client_subscribe() return ~p", [NewTopic]),
  170. {NewTopic, SubOpts}; % modify topic
  171. Other ->
  172. ?LOG(error, "Lua function on_client_subscribe() caught exception, ~p", [Other]),
  173. TopicFilter
  174. end.
  175. on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
  176. NewTopicFilters =
  177. lists:foldr(fun(TopicFilter, Acc) ->
  178. case on_client_unsubscribe_single(ClientId, Username, TopicFilter, LuaState) of
  179. false -> Acc;
  180. NewTopicFilter -> [NewTopicFilter | Acc]
  181. end
  182. end, [], TopicFilters),
  183. case NewTopicFilters of
  184. [] -> stop;
  185. _ -> {ok, NewTopicFilters}
  186. end.
  187. on_client_unsubscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
  188. %% ignore topics starting with $
  189. TopicFilter;
  190. on_client_unsubscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
  191. ?LOG(debug, "hook client(~s/~s) unsubscribe ~p~n", [ClientId, Username, Topic]),
  192. case catch luerl:call_function([on_client_unsubscribe], [ClientId, Username, Topic], LuaState) of
  193. {'EXIT', St} ->
  194. ?LOG(error, "Failed to execute function on_client_unsubscribe(), which has syntax error, St=~p", [St]),
  195. TopicFilter;
  196. {[false], _St} ->
  197. false; % cancel this topic's unsubscription
  198. {[NewTopic], _} ->
  199. ?LOG(debug, "Lua function on_client_unsubscribe() return ~p", [NewTopic]),
  200. {NewTopic, SubOpts}; % modify topic
  201. Other ->
  202. ?LOG(error, "Topic=~p, lua function on_client_unsubscribe() caught exception, ~p", [Topic, Other]),
  203. TopicFilter
  204. end.
  205. on_session_subscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
  206. %% ignore topics starting with $
  207. ok;
  208. on_session_subscribed(#{clientid := ClientId, username := Username},
  209. Topic, SubOpts, _ScriptName, LuaState) ->
  210. ?LOG(debug, "Session(~s/s) subscribed ~s with subopts: ~p~n", [ClientId, Username, Topic, SubOpts]),
  211. case catch luerl:call_function([on_session_subscribed], [ClientId, Username, Topic], LuaState) of
  212. {'EXIT', St} ->
  213. ?LOG(error, "Failed to execute function on_session_subscribed(), which has syntax error, St=~p", [St]),
  214. ok;
  215. {_Result, _St} ->
  216. ok;
  217. Other ->
  218. ?LOG(error, "Topic=~p, lua function on_session_subscribed() caught exception, ~p", [Topic, Other]),
  219. ok
  220. end.
  221. on_session_unsubscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
  222. %% ignore topics starting with $
  223. ok;
  224. on_session_unsubscribed(#{clientid := ClientId, username := Username},
  225. Topic, _SubOpts, _ScriptName, LuaState) ->
  226. ?LOG(debug, "Session(~s/~s) unsubscribed ~s~n", [ClientId, Username, Topic]),
  227. case catch luerl:call_function([on_session_unsubscribed], [ClientId, Username, Topic], LuaState) of
  228. {'EXIT', St} ->
  229. ?LOG(error, "Failed to execute function on_session_unsubscribed(), which has syntax error, St=~p", [St]),
  230. ok;
  231. {_Result, _St} ->
  232. ok;
  233. Other ->
  234. ?LOG(error, "Topic=~p, lua function on_session_unsubscribed() caught exception, ~p", [Topic, Other]),
  235. ok
  236. end.
  237. on_message_publish(Message = #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
  238. %% ignore topics starting with $
  239. {ok, Message};
  240. on_message_publish(Message = #message{from = ClientId,
  241. qos = QoS,
  242. flags = Flags = #{retain := Retain},
  243. topic = Topic,
  244. payload = Payload,
  245. headers = Headers},
  246. _ScriptName, LuaState) ->
  247. Username = maps:get(username, Headers, ?EMPTY_USERNAME),
  248. ?LOG(debug, "Publish ~s~n", [emqx_message:format(Message)]),
  249. case catch luerl:call_function([on_message_publish], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
  250. {'EXIT', St} ->
  251. ?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
  252. {ok, Message};
  253. {[false], _St} ->
  254. {stop, Message};
  255. {[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
  256. ?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
  257. {ok, Message#message{topic = NewTopic, payload = NewPayload,
  258. qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
  259. Other ->
  260. ?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]),
  261. {ok, Message}
  262. end.
  263. on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
  264. %% ignore topics starting with $
  265. ok;
  266. on_message_delivered(#{clientid := ClientId, username := Username},
  267. Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
  268. _ScriptName, LuaState) ->
  269. ?LOG(debug, "Message delivered to client(~s): ~s~n",
  270. [ClientId, emqx_message:format(Message)]),
  271. case catch luerl:call_function([on_message_delivered], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
  272. {'EXIT', St} ->
  273. ?LOG(error, "Failed to execute function on_message_delivered(), which has syntax error, St=~p", [St]),
  274. ok;
  275. {[false], _St} ->
  276. ok;
  277. {[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
  278. {ok, Message#message{topic = NewTopic, payload = NewPayload,
  279. qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
  280. Other ->
  281. ?LOG(error, "Topic=~p, lua function on_message_delivered() caught exception, ~p", [Topic, Other]),
  282. ok
  283. end.
  284. on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
  285. %% ignore topics starting with $
  286. ok;
  287. on_message_acked(#{clientid := ClientId, username := Username},
  288. Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _ScriptName, LuaState) ->
  289. ?LOG(debug, "Message acked by client(~s): ~s~n",
  290. [ClientId, emqx_message:format(Message)]),
  291. case catch luerl:call_function([on_message_acked], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
  292. {'EXIT', St} ->
  293. ?LOG(error, "Failed to execute function on_message_acked(), which has syntax error, St=~p", [St]),
  294. ok;
  295. {_Result, _St} ->
  296. ok;
  297. Other ->
  298. ?LOG(error, "Topic=~p, lua function on_message_acked() caught exception, ~p", [Topic, Other]),
  299. ok
  300. end.
  301. to_retain(0) -> false;
  302. to_retain(1) -> true;
  303. to_retain("true") -> true;
  304. to_retain("false") -> false;
  305. to_retain(<<"true">>) -> true;
  306. to_retain(<<"false">>) -> false;
  307. to_retain(true) -> true;
  308. to_retain(false) -> false;
  309. to_retain(Num) when is_float(Num) ->
  310. case round(Num) of 0 -> false; _ -> true end.