| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_lua_script).
- -include("emqx_lua_hook.hrl").
- -include_lib("emqx_libs/include/emqx.hrl").
- -include_lib("emqx_libs/include/emqx_mqtt.hrl").
- -export([ register_on_message_publish/2
- , register_on_client_connected/2
- , register_on_client_disconnected/2
- , register_on_client_subscribe/2
- , register_on_client_unsubscribe/2
- , register_on_message_acked/2
- , register_on_message_delivered/2
- , register_on_session_subscribed/2
- , register_on_session_unsubscribed/2
- , register_on_client_authenticate/2
- , register_on_client_check_acl/2
- , unregister_hooks/1
- ]).
- -export([ on_client_connected/4
- , on_client_disconnected/5
- , on_client_authenticate/4
- , on_client_check_acl/6
- , on_client_subscribe/5
- , on_client_unsubscribe/5
- , on_session_subscribed/5
- , on_session_unsubscribed/5
- , on_message_publish/3
- , on_message_delivered/4
- , on_message_acked/4
- ]).
- -define(EMPTY_USERNAME, <<"">>).
- -define(HOOK_ADD(A, B), emqx:hook(A, B)).
- -define(HOOK_DEL(A, B), emqx:unhook(A, B)).
- register_on_client_connected(ScriptName, LuaState) ->
- ?HOOK_ADD('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}).
- register_on_client_disconnected(ScriptName, LuaState) ->
- ?HOOK_ADD('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}).
- register_on_client_authenticate(ScriptName, LuaState) ->
- ?HOOK_ADD('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}).
- register_on_client_check_acl(ScriptName, LuaState) ->
- ?HOOK_ADD('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}).
- register_on_client_subscribe(ScriptName, LuaState) ->
- ?HOOK_ADD('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}).
- register_on_client_unsubscribe(ScriptName, LuaState) ->
- ?HOOK_ADD('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}).
- register_on_session_subscribed(ScriptName, LuaState) ->
- ?HOOK_ADD('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}).
- register_on_session_unsubscribed(ScriptName, LuaState) ->
- ?HOOK_ADD('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}).
- register_on_message_publish(ScriptName, LuaState) ->
- ?HOOK_ADD('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}).
- register_on_message_delivered(ScriptName, LuaState) ->
- ?HOOK_ADD('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}).
- register_on_message_acked(ScriptName, LuaState) ->
- ?HOOK_ADD('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
- unregister_hooks({ScriptName, LuaState}) ->
- ?HOOK_DEL('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}),
- ?HOOK_DEL('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}),
- ?HOOK_DEL('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}),
- ?HOOK_DEL('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}),
- ?HOOK_DEL('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}),
- ?HOOK_DEL('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}),
- ?HOOK_DEL('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}),
- ?HOOK_DEL('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}),
- ?HOOK_DEL('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}),
- ?HOOK_DEL('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}),
- ?HOOK_DEL('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
- on_client_connected(ClientInfo = #{clientid := ClientId, username := Username},
- ConnInfo, _ScriptName, LuaState) ->
- ?LOG(debug, "Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
- [ClientId, ClientInfo, ConnInfo]),
- case catch luerl:call_function([on_client_connected], [ClientId, Username], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_connected(), which has syntax error, St=~p", [St]),
- ok;
- {_Result, _St} ->
- ok;
- Other ->
- ?LOG(error, "Lua function on_client_connected() caught exception, ~p", [Other]),
- ok
- end.
- on_client_disconnected(ClientInfo = #{clientid := ClientId, username := Username},
- ReasonCode, ConnInfo, _ScriptName, LuaState) ->
- ?LOG(debug, "Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
- [ClientId, ReasonCode, ClientInfo, ConnInfo]),
- case catch luerl:call_function([on_client_disconnected], [ClientId, Username, ReasonCode], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_disconnected(), which has syntax error, St=~p", [St]),
- ok;
- {_Result, _St} ->
- ok;
- Other ->
- ?LOG(error, "Lua function on_client_disconnected() caught exception, ~p", [Other]),
- ok
- end.
- on_client_authenticate(#{clientid := ClientId,
- username := Username,
- peerhost := Peerhost,
- password := Password}, Result, _ScriptName, LuaState) ->
- case catch luerl:call_function([on_client_authenticate],
- [ClientId, Username, inet:ntoa(Peerhost), Password], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_authenticate(), which has syntax error, St=~p", [St]),
- ok;
- {[<<"ignore">>], _St} ->
- ok;
- {[<<"ok">>], _St} ->
- {stop, Result#{auth_result => success}};
- Other ->
- ?LOG(error, "Lua function on_client_authenticate() caught exception, ~p", [Other]),
- ok
- end.
- on_client_check_acl(#{clientid := ClientId,
- username := Username,
- peerhost := Peerhost,
- password := Password}, Topic, PubSub, _Result, _ScriptName, LuaState) ->
- case catch luerl:call_function([on_client_check_acl], [ClientId, Username, inet:ntoa(Peerhost), Password, Topic, PubSub], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_check_acl(), which has syntax error, St=~p", [St]),
- ok;
- {[<<"ignore">>],_St} ->
- ok;
- {[<<"allow">>], _St} ->
- {stop, allow};
- {[<<"deny">>], _St} ->
- {stop, deny};
- Other ->
- ?LOG(error, "Lua function on_client_check_acl() caught exception, ~p", [Other]),
- ok
- end.
- on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
- NewTopicFilters =
- lists:foldr(fun(TopicFilter, Acc) ->
- case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
- false -> Acc;
- NewTopicFilter -> [NewTopicFilter | Acc]
- end
- end, [], TopicFilters),
- case NewTopicFilters of
- [] -> stop;
- _ -> {ok, NewTopicFilters}
- end.
- on_client_subscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
- %% ignore topics starting with $
- TopicFilter;
- on_client_subscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
- ?LOG(debug, "hook client(~s/~s) will subscribe: ~p~n", [ClientId, Username, Topic]),
- case catch luerl:call_function([on_client_subscribe], [ClientId, Username, Topic], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_subscribe(), which has syntax error, St=~p", [St]),
- TopicFilter;
- {[false], _St} ->
- false; % cancel this topic's subscription
- {[NewTopic], _St} ->
- ?LOG(debug, "LUA function on_client_subscribe() return ~p", [NewTopic]),
- {NewTopic, SubOpts}; % modify topic
- Other ->
- ?LOG(error, "Lua function on_client_subscribe() caught exception, ~p", [Other]),
- TopicFilter
- end.
- on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
- NewTopicFilters =
- lists:foldr(fun(TopicFilter, Acc) ->
- case on_client_unsubscribe_single(ClientId, Username, TopicFilter, LuaState) of
- false -> Acc;
- NewTopicFilter -> [NewTopicFilter | Acc]
- end
- end, [], TopicFilters),
- case NewTopicFilters of
- [] -> stop;
- _ -> {ok, NewTopicFilters}
- end.
- on_client_unsubscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
- %% ignore topics starting with $
- TopicFilter;
- on_client_unsubscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
- ?LOG(debug, "hook client(~s/~s) unsubscribe ~p~n", [ClientId, Username, Topic]),
- case catch luerl:call_function([on_client_unsubscribe], [ClientId, Username, Topic], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_client_unsubscribe(), which has syntax error, St=~p", [St]),
- TopicFilter;
- {[false], _St} ->
- false; % cancel this topic's unsubscription
- {[NewTopic], _} ->
- ?LOG(debug, "Lua function on_client_unsubscribe() return ~p", [NewTopic]),
- {NewTopic, SubOpts}; % modify topic
- Other ->
- ?LOG(error, "Topic=~p, lua function on_client_unsubscribe() caught exception, ~p", [Topic, Other]),
- TopicFilter
- end.
- on_session_subscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
- %% ignore topics starting with $
- ok;
- on_session_subscribed(#{clientid := ClientId, username := Username},
- Topic, SubOpts, _ScriptName, LuaState) ->
- ?LOG(debug, "Session(~s/s) subscribed ~s with subopts: ~p~n", [ClientId, Username, Topic, SubOpts]),
- case catch luerl:call_function([on_session_subscribed], [ClientId, Username, Topic], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_session_subscribed(), which has syntax error, St=~p", [St]),
- ok;
- {_Result, _St} ->
- ok;
- Other ->
- ?LOG(error, "Topic=~p, lua function on_session_subscribed() caught exception, ~p", [Topic, Other]),
- ok
- end.
- on_session_unsubscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
- %% ignore topics starting with $
- ok;
- on_session_unsubscribed(#{clientid := ClientId, username := Username},
- Topic, _SubOpts, _ScriptName, LuaState) ->
- ?LOG(debug, "Session(~s/~s) unsubscribed ~s~n", [ClientId, Username, Topic]),
- case catch luerl:call_function([on_session_unsubscribed], [ClientId, Username, Topic], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_session_unsubscribed(), which has syntax error, St=~p", [St]),
- ok;
- {_Result, _St} ->
- ok;
- Other ->
- ?LOG(error, "Topic=~p, lua function on_session_unsubscribed() caught exception, ~p", [Topic, Other]),
- ok
- end.
- on_message_publish(Message = #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
- %% ignore topics starting with $
- {ok, Message};
- on_message_publish(Message = #message{from = ClientId,
- qos = QoS,
- flags = Flags = #{retain := Retain},
- topic = Topic,
- payload = Payload,
- headers = Headers},
- _ScriptName, LuaState) ->
- Username = maps:get(username, Headers, ?EMPTY_USERNAME),
- ?LOG(debug, "Publish ~s~n", [emqx_message:format(Message)]),
- case catch luerl:call_function([on_message_publish], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
- {ok, Message};
- {[false], _St} ->
- {stop, Message};
- {[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
- ?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
- {ok, Message#message{topic = NewTopic, payload = NewPayload,
- qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
- Other ->
- ?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]),
- {ok, Message}
- end.
- on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
- %% ignore topics starting with $
- ok;
- on_message_delivered(#{clientid := ClientId, username := Username},
- Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
- _ScriptName, LuaState) ->
- ?LOG(debug, "Message delivered to client(~s): ~s~n",
- [ClientId, emqx_message:format(Message)]),
- case catch luerl:call_function([on_message_delivered], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_message_delivered(), which has syntax error, St=~p", [St]),
- ok;
- {[false], _St} ->
- ok;
- {[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
- {ok, Message#message{topic = NewTopic, payload = NewPayload,
- qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
- Other ->
- ?LOG(error, "Topic=~p, lua function on_message_delivered() caught exception, ~p", [Topic, Other]),
- ok
- end.
- on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
- %% ignore topics starting with $
- ok;
- on_message_acked(#{clientid := ClientId, username := Username},
- Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _ScriptName, LuaState) ->
- ?LOG(debug, "Message acked by client(~s): ~s~n",
- [ClientId, emqx_message:format(Message)]),
- case catch luerl:call_function([on_message_acked], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
- {'EXIT', St} ->
- ?LOG(error, "Failed to execute function on_message_acked(), which has syntax error, St=~p", [St]),
- ok;
- {_Result, _St} ->
- ok;
- Other ->
- ?LOG(error, "Topic=~p, lua function on_message_acked() caught exception, ~p", [Topic, Other]),
- ok
- end.
- to_retain(0) -> false;
- to_retain(1) -> true;
- to_retain("true") -> true;
- to_retain("false") -> false;
- to_retain(<<"true">>) -> true;
- to_retain(<<"false">>) -> false;
- to_retain(true) -> true;
- to_retain(false) -> false;
- to_retain(Num) when is_float(Num) ->
- case round(Num) of 0 -> false; _ -> true end.
|