emqx_rule_engine_SUITE.erl 128 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rule_engine_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("common_test/include/ct.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -include_lib("emqx/include/asserts.hrl").
  23. -include_lib("emqx/include/emqx.hrl").
  24. -import(emqx_common_test_helpers, [on_exit/1]).
  25. %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
  26. -define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)).
  27. all() ->
  28. [
  29. {group, engine},
  30. {group, funcs},
  31. {group, registry},
  32. {group, runtime},
  33. {group, events},
  34. {group, telemetry},
  35. {group, bugs},
  36. {group, metrics},
  37. {group, metrics_simple},
  38. {group, metrics_fail},
  39. {group, metrics_fail_simple},
  40. {group, tracing}
  41. ].
  42. suite() ->
  43. [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
  44. groups() ->
  45. [
  46. {engine, [sequence], [t_create_rule]},
  47. {funcs, [], [t_kv_store]},
  48. {registry, [sequence], [
  49. t_add_get_remove_rule,
  50. t_add_get_remove_rules,
  51. t_create_existing_rule,
  52. t_get_rules_for_topic,
  53. t_get_rules_for_topic_2,
  54. t_get_rules_with_same_event,
  55. t_get_rule_ids_by_action,
  56. t_ensure_action_removed
  57. ]},
  58. {runtime, [], [
  59. t_match_atom_and_binary,
  60. t_sqlselect_0,
  61. t_sqlselect_00,
  62. t_sqlselect_with_3rd_party_impl,
  63. t_sqlselect_with_3rd_party_impl2,
  64. t_sqlselect_with_3rd_party_funcs_unknown,
  65. t_sqlselect_001,
  66. t_sqlselect_002,
  67. t_sqlselect_inject_props,
  68. t_sqlselect_01,
  69. t_sqlselect_02,
  70. t_sqlselect_03,
  71. t_sqlselect_1,
  72. t_sqlselect_2,
  73. t_sqlselect_3,
  74. t_sqlselect_message_publish_event_keep_original_props_1,
  75. t_sqlselect_message_publish_event_keep_original_props_2,
  76. t_sqlselect_missing_template_vars_render_as_undefined,
  77. t_sqlparse_event_1,
  78. t_sqlparse_event_2,
  79. t_sqlparse_event_3,
  80. t_sqlparse_foreach_1,
  81. t_sqlparse_foreach_2,
  82. t_sqlparse_foreach_3,
  83. t_sqlparse_foreach_4,
  84. t_sqlparse_foreach_5,
  85. t_sqlparse_foreach_6,
  86. t_sqlparse_foreach_7,
  87. t_sqlparse_foreach_8,
  88. t_sqlparse_foreach_9,
  89. t_sqlparse_case_when_1,
  90. t_sqlparse_case_when_2,
  91. t_sqlparse_case_when_3,
  92. t_sqlparse_array_index_1,
  93. t_sqlparse_array_index_2,
  94. t_sqlparse_array_index_3,
  95. t_sqlparse_array_index_4,
  96. t_sqlparse_array_index_5,
  97. t_sqlparse_array_with_expressions,
  98. t_sqlparse_select_matadata_1,
  99. t_sqlparse_array_range_1,
  100. t_sqlparse_array_range_2,
  101. t_sqlparse_true_false,
  102. t_sqlparse_undefined_variable,
  103. t_sqlparse_new_map,
  104. t_sqlparse_invalid_json,
  105. t_sqlselect_as_put
  106. ]},
  107. {events, [], [
  108. t_events,
  109. t_event_client_disconnected_normal,
  110. t_event_client_disconnected_kicked,
  111. t_event_client_disconnected_discarded,
  112. t_event_client_disconnected_takenover,
  113. t_event_client_disconnected_takenover_2
  114. ]},
  115. {telemetry, [], [
  116. t_get_basic_usage_info_0,
  117. t_get_basic_usage_info_1
  118. ]},
  119. {bugs, [], [
  120. t_sqlparse_payload_as,
  121. t_sqlparse_nested_get
  122. ]},
  123. {metrics, [], [
  124. t_rule_metrics_sync,
  125. t_rule_metrics_async
  126. ]},
  127. {metrics_simple, [], [
  128. t_rule_metrics_sync,
  129. t_rule_metrics_async
  130. ]},
  131. {metrics_fail, [], [
  132. t_rule_metrics_sync_fail,
  133. t_rule_metrics_async_fail
  134. ]},
  135. {metrics_fail_simple, [], [
  136. t_rule_metrics_sync_fail,
  137. t_rule_metrics_async_fail
  138. ]},
  139. {tracing, [], [
  140. t_trace_rule_id
  141. ]}
  142. ].
  143. %%------------------------------------------------------------------------------
  144. %% Overall setup/teardown
  145. %%------------------------------------------------------------------------------
  146. init_per_suite(Config) ->
  147. %% ensure module loaded
  148. emqx_rule_funcs_demo:module_info(),
  149. application:load(emqx_conf),
  150. ok = emqx_common_test_helpers:start_apps(
  151. [emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge],
  152. fun set_special_configs/1
  153. ),
  154. Config.
  155. end_per_suite(_Config) ->
  156. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]),
  157. ok.
  158. set_special_configs(emqx_auth) ->
  159. {ok, _} = emqx:update_config(
  160. [authorization],
  161. #{
  162. <<"no_match">> => atom_to_binary(allow),
  163. <<"cache">> => #{<<"enable">> => atom_to_binary(true)},
  164. <<"sources">> => []
  165. }
  166. ),
  167. ok;
  168. set_special_configs(_) ->
  169. ok.
  170. on_resource_create(_id, _) -> #{}.
  171. on_resource_destroy(_id, _) -> ok.
  172. on_get_resource_status(_id, _) -> #{}.
  173. %%------------------------------------------------------------------------------
  174. %% Group specific setup/teardown
  175. %%------------------------------------------------------------------------------
  176. group(_Groupname) ->
  177. [].
  178. -define(BRIDGE_IMPL, emqx_bridge_mqtt_connector).
  179. init_per_group(registry, Config) ->
  180. Config;
  181. init_per_group(metrics_fail, Config) ->
  182. meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
  183. meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}),
  184. [{mecked, [?BRIDGE_IMPL]} | Config];
  185. init_per_group(metrics_simple, Config) ->
  186. meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
  187. meck:expect(?BRIDGE_IMPL, query_mode, fun
  188. (#{resource_opts := #{query_mode := sync}}) -> simple_sync;
  189. (_) -> simple_async
  190. end),
  191. [{mecked, [?BRIDGE_IMPL]} | Config];
  192. init_per_group(metrics_fail_simple, Config) ->
  193. meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
  194. meck:expect(?BRIDGE_IMPL, query_mode, fun
  195. (#{resource_opts := #{query_mode := sync}}) -> simple_sync;
  196. (_) -> simple_async
  197. end),
  198. meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
  199. meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) ->
  200. Result = {error, {unrecoverable_error, mecked_failure}},
  201. erlang:apply(ReplyFun, Args ++ [Result]),
  202. Result
  203. end),
  204. [{mecked, [?BRIDGE_IMPL]} | Config];
  205. init_per_group(_Groupname, Config) ->
  206. Config.
  207. end_per_group(_Groupname, Config) ->
  208. case ?config(mecked, Config) of
  209. undefined -> ok;
  210. Mecked -> meck:unload(Mecked)
  211. end.
  212. %%------------------------------------------------------------------------------
  213. %% Testcase specific setup/teardown
  214. %%------------------------------------------------------------------------------
  215. init_per_testcase(t_events, Config) ->
  216. init_events_counters(),
  217. SQL =
  218. "SELECT * FROM \"$events/client_connected\", "
  219. "\"$events/client_disconnected\", "
  220. "\"$events/client_connack\", "
  221. "\"$events/client_check_authz_complete\", "
  222. "\"$events/session_subscribed\", "
  223. "\"$events/session_unsubscribed\", "
  224. "\"$events/message_acked\", "
  225. "\"$events/message_delivered\", "
  226. "\"$events/message_dropped\", "
  227. "\"$events/delivery_dropped\", "
  228. "\"t1\"",
  229. {ok, Rule} = emqx_rule_engine:create_rule(
  230. #{
  231. id => <<"rule:t_events">>,
  232. sql => SQL,
  233. actions => [
  234. #{
  235. function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
  236. args => #{}
  237. }
  238. ],
  239. description => <<"to console and record triggered events">>
  240. }
  241. ),
  242. ?assertMatch(#{id := <<"rule:t_events">>}, Rule),
  243. [{hook_points_rules, Rule} | Config];
  244. init_per_testcase(t_get_basic_usage_info_1, Config) ->
  245. meck:new(emqx_bridge, [passthrough, no_link, no_history]),
  246. meck:expect(emqx_bridge, lookup, fun(_Type, _Name) -> {ok, #{mocked => true}} end),
  247. Config;
  248. init_per_testcase(_TestCase, Config) ->
  249. Config.
  250. end_per_testcase(t_events, Config) ->
  251. ets:delete(events_record_tab),
  252. ok = delete_rule(?config(hook_points_rules, Config)),
  253. emqx_common_test_helpers:call_janitor(),
  254. ok;
  255. end_per_testcase(t_get_basic_usage_info_1, _Config) ->
  256. meck:unload(),
  257. emqx_common_test_helpers:call_janitor(),
  258. ok;
  259. end_per_testcase(_TestCase, _Config) ->
  260. emqx_common_test_helpers:call_janitor(),
  261. ok.
  262. %%------------------------------------------------------------------------------
  263. %% Test cases for rule engine
  264. %%------------------------------------------------------------------------------
  265. t_create_rule(_Config) ->
  266. {ok, #{id := Id}} = emqx_rule_engine:create_rule(
  267. #{
  268. sql => <<"select * from \"t/a\"">>,
  269. id => <<"t_create_rule">>,
  270. actions => [#{function => console}],
  271. description => <<"debug rule">>
  272. }
  273. ),
  274. ct:pal("======== emqx_rule_engine:get_rules :~p", [emqx_rule_engine:get_rules()]),
  275. ?assertMatch(
  276. {ok, #{id := Id, from := [<<"t/a">>]}},
  277. emqx_rule_engine:get_rule(Id)
  278. ),
  279. delete_rule(Id),
  280. ok.
  281. %%------------------------------------------------------------------------------
  282. %% Test cases for rule funcs
  283. %%------------------------------------------------------------------------------
  284. t_kv_store(_) ->
  285. undefined = emqx_rule_funcs:kv_store_get(<<"abc">>),
  286. <<"not_found">> = emqx_rule_funcs:kv_store_get(<<"abc">>, <<"not_found">>),
  287. emqx_rule_funcs:kv_store_put(<<"abc">>, 1),
  288. 1 = emqx_rule_funcs:kv_store_get(<<"abc">>),
  289. emqx_rule_funcs:kv_store_del(<<"abc">>),
  290. undefined = emqx_rule_funcs:kv_store_get(<<"abc">>).
  291. t_function_clause_errors(_Config) ->
  292. SQL0 = <<"select upper(xxxx) from \"t/a\"">>,
  293. Payload = <<"{}">>,
  294. ?assertMatch(
  295. {error,
  296. {select_and_transform_error,
  297. {throw,
  298. #{
  299. arguments := [undefined],
  300. reason := bad_sql_function_argument,
  301. function_name := upper
  302. },
  303. _Stack}}},
  304. emqx_rule_sqltester:test(
  305. #{
  306. sql => SQL0,
  307. context => #{payload => Payload, topic => <<"t/a">>}
  308. }
  309. )
  310. ),
  311. SQL1 = <<"foreach xs as x do upper(xxxx) from \"t/a\"">>,
  312. ?assertMatch(
  313. {error, {
  314. {doeach_error,
  315. {throw,
  316. #{
  317. arguments := [undefined],
  318. reason := bad_sql_function_argument,
  319. function_name := upper
  320. },
  321. _Stack0}},
  322. _Stack1
  323. }},
  324. emqx_rule_sqltester:test(
  325. #{
  326. sql => SQL1,
  327. context => #{payload => Payload, xs => [1, 2, 3], topic => <<"t/a">>}
  328. }
  329. )
  330. ),
  331. SQL2 = <<"foreach upper(xxxx) as x from \"t/a\"">>,
  332. ?assertMatch(
  333. {error,
  334. {select_and_collect_error,
  335. {throw,
  336. #{
  337. arguments := [undefined],
  338. reason := bad_sql_function_argument,
  339. function_name := upper
  340. },
  341. _Stack}}},
  342. emqx_rule_sqltester:test(
  343. #{
  344. sql => SQL2,
  345. context => #{payload => Payload, topic => <<"t/a">>}
  346. }
  347. )
  348. ),
  349. ok.
  350. %%------------------------------------------------------------------------------
  351. %% Test cases for rule registry
  352. %%------------------------------------------------------------------------------
  353. t_add_get_remove_rule(_Config) ->
  354. RuleId0 = <<"rule-debug-0">>,
  355. ok = create_rule(make_simple_rule(RuleId0)),
  356. ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)),
  357. ok = delete_rule(RuleId0),
  358. ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)),
  359. RuleId1 = <<"rule-debug-1">>,
  360. Rule1 = make_simple_rule(RuleId1),
  361. ok = create_rule(Rule1),
  362. ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)),
  363. ok = delete_rule(Rule1),
  364. ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)),
  365. ok.
  366. t_add_get_remove_rules(_Config) ->
  367. delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]),
  368. ok = create_rules(
  369. [
  370. make_simple_rule(<<"rule-debug-1">>),
  371. make_simple_rule(<<"rule-debug-2">>)
  372. ]
  373. ),
  374. ?assertEqual(2, length(emqx_rule_engine:get_rules())),
  375. ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
  376. ?assertEqual([], emqx_rule_engine:get_rules()),
  377. ok.
  378. t_create_existing_rule(_Config) ->
  379. %% create a rule using given rule id
  380. {ok, _} = emqx_rule_engine:create_rule(
  381. #{
  382. id => <<"an_existing_rule">>,
  383. sql => <<"select * from \"t/#\"">>,
  384. actions => [#{function => console}]
  385. }
  386. ),
  387. {ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>),
  388. ?assertEqual(<<"select * from \"t/#\"">>, SQL),
  389. ok = delete_rule(<<"an_existing_rule">>),
  390. ?assertEqual(not_found, emqx_rule_engine:get_rule(<<"an_existing_rule">>)),
  391. ok.
  392. t_get_rules_for_topic(_Config) ->
  393. Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)),
  394. ok = create_rules(
  395. [
  396. make_simple_rule(<<"rule-debug-1">>),
  397. make_simple_rule(<<"rule-debug-2">>)
  398. ]
  399. ),
  400. ?assertEqual(Len0 + 2, length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>))),
  401. ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
  402. ok.
  403. t_get_rules_ordered_by_ts(_Config) ->
  404. Now = erlang:system_time(microsecond),
  405. ok = create_rules(
  406. [
  407. make_simple_rule_with_ts(<<"rule-debug-0">>, Now + 1),
  408. make_simple_rule_with_ts(<<"rule-debug-1">>, Now + 2),
  409. make_simple_rule_with_ts(<<"rule-debug-2">>, Now + 3)
  410. ]
  411. ),
  412. ?assertMatch(
  413. [
  414. #{id := <<"rule-debug-0">>},
  415. #{id := <<"rule-debug-1">>},
  416. #{id := <<"rule-debug-2">>}
  417. ],
  418. emqx_rule_engine:get_rules_ordered_by_ts()
  419. ).
  420. t_get_rules_for_topic_2(_Config) ->
  421. Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)),
  422. ok = create_rules(
  423. [
  424. make_simple_rule(<<"rule-debug-1">>, _1 = <<"select * from \"simple/#\"">>),
  425. make_simple_rule(<<"rule-debug-2">>, _2 = <<"select * from \"simple/+\"">>),
  426. make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>),
  427. make_simple_rule(<<"rule-debug-4">>, _3 = <<"select * from \"simple/1\"">>),
  428. make_simple_rule(
  429. <<"rule-debug-5">>,
  430. _4 = <<"select * from \"simple/2\", \"simple/+\", \"simple/3\"">>
  431. ),
  432. make_simple_rule(
  433. <<"rule-debug-6">>,
  434. <<"select * from \"simple/2\", \"simple/3\", \"simple/4\"">>
  435. )
  436. ]
  437. ),
  438. ?assertEqual(Len0 + 4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))),
  439. ok = delete_rules_by_ids([
  440. <<"rule-debug-1">>,
  441. <<"rule-debug-2">>,
  442. <<"rule-debug-3">>,
  443. <<"rule-debug-4">>,
  444. <<"rule-debug-5">>,
  445. <<"rule-debug-6">>
  446. ]),
  447. ok.
  448. t_get_rules_with_same_event(_Config) ->
  449. PubT = <<"simple/1">>,
  450. PubN = length(emqx_rule_engine:get_rules_with_same_event(PubT)),
  451. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>)),
  452. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>)),
  453. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>)),
  454. ?assertEqual(
  455. [], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>)
  456. ),
  457. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)),
  458. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)),
  459. ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)),
  460. ok = create_rules(
  461. [
  462. make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>),
  463. make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>),
  464. make_simple_rule(
  465. <<"r3">>,
  466. <<"select * from \"$events/client_connected\"">>
  467. ),
  468. make_simple_rule(
  469. <<"r4">>,
  470. <<"select * from \"$events/client_disconnected\"">>
  471. ),
  472. make_simple_rule(
  473. <<"r5">>,
  474. <<"select * from \"$events/session_subscribed\"">>
  475. ),
  476. make_simple_rule(
  477. <<"r6">>,
  478. <<"select * from \"$events/session_unsubscribed\"">>
  479. ),
  480. make_simple_rule(
  481. <<"r7">>,
  482. <<"select * from \"$events/message_delivered\"">>
  483. ),
  484. make_simple_rule(
  485. <<"r8">>,
  486. <<"select * from \"$events/message_acked\"">>
  487. ),
  488. make_simple_rule(
  489. <<"r9">>,
  490. <<"select * from \"$events/message_dropped\"">>
  491. ),
  492. make_simple_rule(
  493. <<"r10">>,
  494. <<
  495. "select * from \"t/1\", "
  496. "\"$events/session_subscribed\", \"$events/client_connected\""
  497. >>
  498. )
  499. ]
  500. ),
  501. ?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))),
  502. ?assertEqual(
  503. 2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))
  504. ),
  505. ?assertEqual(
  506. 1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>))
  507. ),
  508. ?assertEqual(
  509. 2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>))
  510. ),
  511. ?assertEqual(
  512. 1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>))
  513. ),
  514. ?assertEqual(
  515. 1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))
  516. ),
  517. ?assertEqual(
  518. 1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))
  519. ),
  520. ?assertEqual(
  521. 1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))
  522. ),
  523. ok = delete_rules_by_ids([
  524. <<"r1">>,
  525. <<"r2">>,
  526. <<"r3">>,
  527. <<"r4">>,
  528. <<"r5">>,
  529. <<"r6">>,
  530. <<"r7">>,
  531. <<"r8">>,
  532. <<"r9">>,
  533. <<"r10">>
  534. ]),
  535. ok.
  536. t_get_rule_ids_by_action(_) ->
  537. ID = <<"t_get_rule_ids_by_action">>,
  538. Rule1 = #{
  539. id => ID,
  540. sql => <<"SELECT * FROM \"t\"">>,
  541. actions => [
  542. #{function => console, args => #{}},
  543. #{function => republish, args => #{}},
  544. <<"mqtt:my_mqtt_bridge">>,
  545. <<"mysql:foo">>
  546. ],
  547. description => ID,
  548. created_at => erlang:system_time(millisecond)
  549. },
  550. ok = create_rules([Rule1]),
  551. ?assertMatch(
  552. [ID],
  553. emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>})
  554. ),
  555. ?assertMatch(
  556. [ID],
  557. emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:republish">>})
  558. ),
  559. ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(#{function => <<"some_mod:fun">>})),
  560. ?assertMatch([ID], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:foo">>)),
  561. ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:not_exists">>)),
  562. ok = delete_rules_by_ids([<<"t_get_rule_ids_by_action">>]).
  563. t_ensure_action_removed(_) ->
  564. Id = <<"t_ensure_action_removed">>,
  565. GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
  566. {ok, _} = emqx:update_config(
  567. [rule_engine, rules, Id],
  568. #{
  569. <<"actions">> => [
  570. #{<<"function">> => GetSelectedData},
  571. #{<<"function">> => <<"console">>},
  572. #{
  573. <<"function">> => <<"republish">>,
  574. <<"args">> => #{<<"topic">> => <<"some/topic">>}
  575. },
  576. <<"mysql:foo">>,
  577. <<"mqtt:bar">>
  578. ],
  579. <<"description">> => <<"">>,
  580. <<"sql">> => <<"SELECT * FROM \"t/#\"">>
  581. }
  582. ),
  583. ?assertMatch(
  584. #{
  585. <<"actions">> := [
  586. #{<<"function">> := GetSelectedData},
  587. #{<<"function">> := <<"console">>},
  588. #{<<"function">> := <<"republish">>},
  589. <<"mysql:foo">>,
  590. <<"mqtt:bar">>
  591. ]
  592. },
  593. emqx:get_raw_config([rule_engine, rules, Id])
  594. ),
  595. ok = emqx_rule_engine:ensure_action_removed(Id, #{function => <<"console">>}),
  596. ?assertMatch(
  597. #{
  598. <<"actions">> := [
  599. #{<<"function">> := GetSelectedData},
  600. #{<<"function">> := <<"republish">>},
  601. <<"mysql:foo">>,
  602. <<"mqtt:bar">>
  603. ]
  604. },
  605. emqx:get_raw_config([rule_engine, rules, Id])
  606. ),
  607. ok = emqx_rule_engine:ensure_action_removed(Id, <<"mysql:foo">>),
  608. ?assertMatch(
  609. #{
  610. <<"actions">> := [
  611. #{<<"function">> := GetSelectedData},
  612. #{<<"function">> := <<"republish">>},
  613. <<"mqtt:bar">>
  614. ]
  615. },
  616. emqx:get_raw_config([rule_engine, rules, Id])
  617. ),
  618. ok = emqx_rule_engine:ensure_action_removed(Id, #{function => GetSelectedData}),
  619. ?assertMatch(
  620. #{
  621. <<"actions">> := [
  622. #{<<"function">> := <<"republish">>},
  623. <<"mqtt:bar">>
  624. ]
  625. },
  626. emqx:get_raw_config([rule_engine, rules, Id])
  627. ),
  628. emqx:remove_config([rule_engine, rules, Id]).
  629. %%------------------------------------------------------------------------------
  630. %% Test cases for rule runtime
  631. %%------------------------------------------------------------------------------
  632. t_json_payload_decoding(_Config) ->
  633. {ok, C} = emqtt:start_link(),
  634. on_exit(fun() -> emqtt:stop(C) end),
  635. {ok, _} = emqtt:connect(C),
  636. Cases =
  637. [
  638. #{
  639. select_fields =>
  640. <<"payload.measurement, payload.data_type, payload.value, payload.device_id">>,
  641. payload => emqx_utils_json:encode(#{
  642. measurement => <<"temp">>,
  643. data_type => <<"FLOAT">>,
  644. value => <<"32.12">>,
  645. device_id => <<"devid">>
  646. }),
  647. expected => #{
  648. payload => #{
  649. <<"measurement">> => <<"temp">>,
  650. <<"data_type">> => <<"FLOAT">>,
  651. <<"value">> => <<"32.12">>,
  652. <<"device_id">> => <<"devid">>
  653. }
  654. }
  655. },
  656. %% "last write wins" examples
  657. #{
  658. select_fields => <<"payload as p, payload.f as p.answer">>,
  659. payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}),
  660. expected => #{
  661. <<"p">> => #{
  662. <<"answer">> => 42
  663. }
  664. }
  665. },
  666. #{
  667. select_fields => <<"payload as p, payload.f as p.jsonlike.f">>,
  668. payload => emqx_utils_json:encode(#{
  669. jsonlike => emqx_utils_json:encode(#{a => 0}),
  670. f => <<"huh">>
  671. }),
  672. %% behavior from 4.4: jsonlike gets wiped without preserving old "keys"
  673. %% here we overwrite it since we don't explicitly decode it
  674. expected => #{
  675. <<"p">> => #{
  676. <<"jsonlike">> => #{<<"f">> => <<"huh">>}
  677. }
  678. }
  679. },
  680. #{
  681. select_fields =>
  682. <<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>,
  683. payload => emqx_utils_json:encode(#{
  684. measurement => <<"temp">>,
  685. data_type => <<"FLOAT">>,
  686. value => <<"32.12">>,
  687. device_id => <<"devid">>
  688. }),
  689. expected => #{
  690. <<"p">> => 51
  691. }
  692. },
  693. %% if selected field is already structured, new values are inserted into it
  694. #{
  695. select_fields =>
  696. <<"json_decode(payload) as p, payload.a as p.z">>,
  697. payload => emqx_utils_json:encode(#{
  698. a => 1,
  699. b => <<"2">>
  700. }),
  701. expected => #{
  702. <<"p">> => #{
  703. <<"a">> => 1,
  704. <<"b">> => <<"2">>,
  705. <<"z">> => 1
  706. }
  707. }
  708. }
  709. ],
  710. ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
  711. Topic = <<"some/topic">>,
  712. ok = snabbkaffe:start_trace(),
  713. on_exit(fun() -> snabbkaffe:stop() end),
  714. on_exit(fun() -> delete_rule(?TMP_RULEID) end),
  715. lists:foreach(
  716. fun(#{select_fields := Fs, payload := P, expected := E} = Case) ->
  717. ct:pal("testing case ~p", [Case]),
  718. SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>,
  719. delete_rule(?TMP_RULEID),
  720. {ok, _Rule} = emqx_rule_engine:create_rule(
  721. #{
  722. sql => SQL,
  723. id => ?TMP_RULEID,
  724. actions => [#{function => ActionFn}]
  725. }
  726. ),
  727. {_, {ok, Event}} =
  728. ?wait_async_action(
  729. emqtt:publish(C, Topic, P, 0),
  730. #{?snk_kind := action_response},
  731. 5_000
  732. ),
  733. ?assertMatch(
  734. #{selected := E},
  735. Event,
  736. #{payload => P, fields => Fs, expected => E}
  737. ),
  738. ok
  739. end,
  740. Cases
  741. ),
  742. snabbkaffe:stop(),
  743. ok.
  744. t_events(_Config) ->
  745. {ok, Client} = emqtt:start_link(
  746. [
  747. {username, <<"u_event">>},
  748. {clientid, <<"c_event">>},
  749. {proto_ver, v5},
  750. {properties, #{'Session-Expiry-Interval' => 60}}
  751. ]
  752. ),
  753. {ok, Client2} = emqtt:start_link(
  754. [
  755. {username, <<"u_event2">>},
  756. {clientid, <<"c_event2">>},
  757. {proto_ver, v5},
  758. {properties, #{'Session-Expiry-Interval' => 60}}
  759. ]
  760. ),
  761. ct:pal("====== verify $events/client_connected, $events/client_connack"),
  762. client_connected(Client, Client2),
  763. ct:pal("====== verify $events/message_dropped"),
  764. message_dropped(Client),
  765. ct:pal("====== verify $events/session_subscribed"),
  766. session_subscribed(Client2),
  767. ct:pal("====== verify t1"),
  768. message_publish(Client),
  769. ct:pal("====== verify $events/delivery_dropped"),
  770. delivery_dropped(Client),
  771. ct:pal("====== verify $events/message_delivered"),
  772. message_delivered(Client),
  773. ct:pal("====== verify $events/message_acked"),
  774. message_acked(Client),
  775. ct:pal("====== verify $events/session_unsubscribed"),
  776. session_unsubscribed(Client2),
  777. ct:pal("====== verify $events/client_disconnected"),
  778. client_disconnected(Client, Client2),
  779. ct:pal("====== verify $events/client_connack"),
  780. client_connack_failed(),
  781. ok.
  782. t_event_client_disconnected_normal(_Config) ->
  783. SQL =
  784. "select * "
  785. "from \"$events/client_disconnected\" ",
  786. RepubT = <<"repub/to/disconnected/normal">>,
  787. {ok, TopicRule} = emqx_rule_engine:create_rule(
  788. #{
  789. sql => SQL,
  790. id => ?TMP_RULEID,
  791. actions => [republish_action(RepubT, <<>>)]
  792. }
  793. ),
  794. {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
  795. {ok, _} = emqtt:connect(Client),
  796. {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
  797. ct:sleep(200),
  798. {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
  799. {ok, _} = emqtt:connect(Client1),
  800. emqtt:disconnect(Client1),
  801. receive
  802. {publish, #{topic := T, payload := Payload}} ->
  803. ?assertEqual(RepubT, T),
  804. ?assertMatch(
  805. #{<<"reason">> := <<"normal">>}, emqx_utils_json:decode(Payload, [return_maps])
  806. )
  807. after 1000 ->
  808. ct:fail(wait_for_repub_disconnected_normal)
  809. end,
  810. emqtt:stop(Client),
  811. delete_rule(TopicRule).
  812. t_event_client_disconnected_kicked(_Config) ->
  813. SQL =
  814. "select * "
  815. "from \"$events/client_disconnected\" ",
  816. RepubT = <<"repub/to/disconnected/kicked">>,
  817. {ok, TopicRule} = emqx_rule_engine:create_rule(
  818. #{
  819. sql => SQL,
  820. id => ?TMP_RULEID,
  821. actions => [republish_action(RepubT, <<>>)]
  822. }
  823. ),
  824. {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
  825. {ok, _} = emqtt:connect(Client),
  826. {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
  827. ct:sleep(200),
  828. {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
  829. {ok, _} = emqtt:connect(Client1),
  830. %% the process will receive {'EXIT',{shutdown,tcp_closed}}
  831. unlink(Client1),
  832. emqx_cm:kick_session(<<"emqx">>),
  833. receive
  834. {publish, #{topic := T, payload := Payload}} ->
  835. ?assertEqual(RepubT, T),
  836. ?assertMatch(
  837. #{<<"reason">> := <<"kicked">>}, emqx_utils_json:decode(Payload, [return_maps])
  838. )
  839. after 1000 ->
  840. ct:fail(wait_for_repub_disconnected_kicked)
  841. end,
  842. emqtt:stop(Client),
  843. delete_rule(TopicRule).
  844. t_event_client_disconnected_discarded(_Config) ->
  845. SQL =
  846. "select * "
  847. "from \"$events/client_disconnected\" ",
  848. RepubT = <<"repub/to/disconnected/discarded">>,
  849. {ok, TopicRule} = emqx_rule_engine:create_rule(
  850. #{
  851. sql => SQL,
  852. id => ?TMP_RULEID,
  853. actions => [republish_action(RepubT, <<>>)]
  854. }
  855. ),
  856. {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
  857. {ok, _} = emqtt:connect(Client),
  858. {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
  859. ct:sleep(200),
  860. {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
  861. {ok, _} = emqtt:connect(Client1),
  862. %% the process will receive {'EXIT',{shutdown,tcp_closed}}
  863. unlink(Client1),
  864. {ok, Client2} = emqtt:start_link([
  865. {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}
  866. ]),
  867. {ok, _} = emqtt:connect(Client2),
  868. receive
  869. {publish, #{topic := T, payload := Payload}} ->
  870. ?assertEqual(RepubT, T),
  871. ?assertMatch(
  872. #{<<"reason">> := <<"discarded">>}, emqx_utils_json:decode(Payload, [return_maps])
  873. )
  874. after 1000 ->
  875. ct:fail(wait_for_repub_disconnected_discarded)
  876. end,
  877. emqtt:stop(Client),
  878. emqtt:stop(Client2),
  879. delete_rule(TopicRule).
  880. t_event_client_disconnected_takenover(_Config) ->
  881. SQL =
  882. "select * "
  883. "from \"$events/client_disconnected\" ",
  884. RepubT = <<"repub/to/disconnected/takenover">>,
  885. {ok, TopicRule} = emqx_rule_engine:create_rule(
  886. #{
  887. sql => SQL,
  888. id => ?TMP_RULEID,
  889. actions => [republish_action(RepubT, <<>>)]
  890. }
  891. ),
  892. {ok, ClientRecv} = emqtt:start_link([
  893. {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}
  894. ]),
  895. {ok, _} = emqtt:connect(ClientRecv),
  896. {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
  897. ct:sleep(200),
  898. {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
  899. {ok, _} = emqtt:connect(Client1),
  900. %% the process will receive {'EXIT',{shutdown,tcp_closed}}
  901. unlink(Client1),
  902. {ok, Client2} = emqtt:start_link([
  903. {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
  904. ]),
  905. {ok, _} = emqtt:connect(Client2),
  906. receive
  907. {publish, #{topic := T, payload := Payload}} ->
  908. ?assertEqual(RepubT, T),
  909. ?assertMatch(
  910. #{<<"reason">> := <<"takenover">>}, emqx_utils_json:decode(Payload, [return_maps])
  911. )
  912. after 1000 ->
  913. ct:fail(wait_for_repub_disconnected_discarded)
  914. end,
  915. emqtt:stop(ClientRecv),
  916. emqtt:stop(Client2),
  917. delete_rule(TopicRule).
  918. t_event_client_disconnected_takenover_2(_Config) ->
  919. SQL =
  920. "select * "
  921. "from \"$events/client_disconnected\" ",
  922. RepubT = <<"repub/to/disconnected/takenover">>,
  923. {ok, TopicRule} = emqx_rule_engine:create_rule(
  924. #{
  925. sql => SQL,
  926. id => ?TMP_RULEID,
  927. actions => [republish_action(RepubT, <<>>)]
  928. }
  929. ),
  930. {ok, ClientRecv} = emqtt:start_link([
  931. {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}
  932. ]),
  933. {ok, _} = emqtt:connect(ClientRecv),
  934. {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
  935. ct:sleep(200),
  936. {ok, Client1} = emqtt:start_link([
  937. {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
  938. ]),
  939. {ok, _} = emqtt:connect(Client1),
  940. ok = emqtt:disconnect(Client1),
  941. %% receive the normal disconnected event
  942. receive
  943. {publish, #{topic := T, payload := Payload}} ->
  944. ?assertEqual(RepubT, T),
  945. ?assertMatch(
  946. #{<<"reason">> := <<"normal">>}, emqx_utils_json:decode(Payload, [return_maps])
  947. )
  948. after 1000 ->
  949. ct:fail(wait_for_repub_disconnected_discarded)
  950. end,
  951. {ok, Client2} = emqtt:start_link([
  952. {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
  953. ]),
  954. {ok, _} = emqtt:connect(Client2),
  955. %% should not receive the takenoverdisconnected event
  956. receive
  957. {publish, #{topic := T1, payload := Payload1}} ->
  958. ?assertEqual(RepubT, T1),
  959. ?assertMatch(
  960. #{<<"reason">> := <<"takenover">>}, emqx_utils_json:decode(Payload1, [return_maps])
  961. ),
  962. ct:fail(wait_for_repub_disconnected_discarded)
  963. after 1000 ->
  964. ok
  965. end,
  966. emqtt:stop(ClientRecv),
  967. emqtt:stop(Client2),
  968. delete_rule(TopicRule).
  969. client_connack_failed() ->
  970. {ok, Client} = emqtt:start_link(
  971. [
  972. {username, <<"u_event3">>},
  973. {clientid, <<"c_event3">>},
  974. {proto_ver, v5},
  975. {properties, #{'Session-Expiry-Interval' => 60}}
  976. ]
  977. ),
  978. try
  979. meck:new(emqx_access_control, [non_strict, passthrough]),
  980. meck:expect(
  981. emqx_access_control,
  982. authenticate,
  983. fun(_) -> {error, bad_username_or_password} end
  984. ),
  985. process_flag(trap_exit, true),
  986. ?assertMatch({error, _}, emqtt:connect(Client)),
  987. timer:sleep(300),
  988. verify_event('client.connack')
  989. after
  990. meck:unload(emqx_access_control)
  991. end,
  992. ok.
  993. message_publish(Client) ->
  994. emqtt:publish(
  995. Client,
  996. <<"t1">>,
  997. #{'Message-Expiry-Interval' => 60},
  998. <<"{\"id\": 1, \"name\": \"ha\"}">>,
  999. [{qos, 1}]
  1000. ),
  1001. verify_event('message.publish'),
  1002. ok.
  1003. client_connected(Client, Client2) ->
  1004. {ok, _} = emqtt:connect(Client),
  1005. {ok, _} = emqtt:connect(Client2),
  1006. verify_event('client.connack'),
  1007. verify_event('client.connected'),
  1008. ok.
  1009. client_disconnected(Client, Client2) ->
  1010. ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
  1011. ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
  1012. verify_event('client.disconnected'),
  1013. ok.
  1014. session_subscribed(Client2) ->
  1015. {ok, _, _} = emqtt:subscribe(
  1016. Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>, 1
  1017. ),
  1018. verify_event('session.subscribed'),
  1019. verify_event('client.check_authz_complete'),
  1020. ok.
  1021. session_unsubscribed(Client2) ->
  1022. {ok, _, _} = emqtt:unsubscribe(
  1023. Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>
  1024. ),
  1025. verify_event('session.unsubscribed'),
  1026. ok.
  1027. message_delivered(_Client) ->
  1028. verify_event('message.delivered'),
  1029. ok.
  1030. delivery_dropped(Client) ->
  1031. %% subscribe "t1" and then publish to "t1", the message will not be received by itself
  1032. %% because we have set the subscribe flag 'nl' = true
  1033. {ok, _, _} = emqtt:subscribe(Client, #{}, <<"t1">>, [{nl, true}, {qos, 1}]),
  1034. ct:sleep(50),
  1035. message_publish(Client),
  1036. ct:pal("--- current emqx hooks: ~p", [ets:tab2list(emqx_hooks)]),
  1037. verify_event('delivery.dropped'),
  1038. ok.
  1039. message_dropped(Client) ->
  1040. message_publish(Client),
  1041. verify_event('message.dropped'),
  1042. ok.
  1043. message_acked(_Client) ->
  1044. verify_event('message.acked'),
  1045. ok.
  1046. t_match_atom_and_binary(_Config) ->
  1047. SQL =
  1048. "SELECT connected_at as ts, * "
  1049. "FROM \"$events/client_connected\" "
  1050. "WHERE username = 'emqx2' ",
  1051. Repub = republish_action(<<"t2">>, <<"user:${ts}">>),
  1052. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1053. #{
  1054. sql => SQL,
  1055. id => ?TMP_RULEID,
  1056. actions => [Repub]
  1057. }
  1058. ),
  1059. {ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]),
  1060. {ok, _} = emqtt:connect(Client),
  1061. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1062. ct:sleep(100),
  1063. {ok, Client2} = emqtt:start_link([{username, <<"emqx2">>}]),
  1064. {ok, _} = emqtt:connect(Client2),
  1065. receive
  1066. {publish, #{topic := T, payload := Payload}} ->
  1067. ?assertEqual(<<"t2">>, T),
  1068. <<"user:", ConnAt/binary>> = Payload,
  1069. _ = binary_to_integer(ConnAt)
  1070. after 1000 ->
  1071. ct:fail(wait_for_t2)
  1072. end,
  1073. emqtt:stop(Client),
  1074. delete_rule(TopicRule).
  1075. t_sqlselect_0(_Config) ->
  1076. %% Verify SELECT with and without 'AS'
  1077. Sql =
  1078. "select * "
  1079. "from \"t/#\" "
  1080. "where payload.cmd.info = 'tt'",
  1081. ?assertMatch(
  1082. {ok, #{payload := <<"{\"cmd\": {\"info\":\"tt\"}}">>}},
  1083. emqx_rule_sqltester:test(
  1084. #{
  1085. sql => Sql,
  1086. context =>
  1087. #{
  1088. payload =>
  1089. <<"{\"cmd\": {\"info\":\"tt\"}}">>,
  1090. topic => <<"t/a">>
  1091. }
  1092. }
  1093. )
  1094. ),
  1095. Sql2 =
  1096. "select payload.cmd as cmd "
  1097. "from \"t/#\" "
  1098. "where cmd.info = 'tt'",
  1099. ?assertMatch(
  1100. {ok, #{<<"cmd">> := #{<<"info">> := <<"tt">>}}},
  1101. emqx_rule_sqltester:test(
  1102. #{
  1103. sql => Sql2,
  1104. context =>
  1105. #{
  1106. payload =>
  1107. <<"{\"cmd\": {\"info\":\"tt\"}}">>,
  1108. topic => <<"t/a">>
  1109. }
  1110. }
  1111. )
  1112. ),
  1113. Sql3 =
  1114. "select payload.cmd as cmd, cmd.info as info "
  1115. "from \"t/#\" "
  1116. "where cmd.info = 'tt' and info = 'tt'",
  1117. ?assertMatch(
  1118. {ok, #{
  1119. <<"cmd">> := #{<<"info">> := <<"tt">>},
  1120. <<"info">> := <<"tt">>
  1121. }},
  1122. emqx_rule_sqltester:test(
  1123. #{
  1124. sql => Sql3,
  1125. context =>
  1126. #{
  1127. payload =>
  1128. <<"{\"cmd\": {\"info\":\"tt\"}}">>,
  1129. topic => <<"t/a">>
  1130. }
  1131. }
  1132. )
  1133. ),
  1134. %% cascaded as
  1135. Sql4 =
  1136. "select payload.cmd as cmd, cmd.info as meta.info "
  1137. "from \"t/#\" "
  1138. "where cmd.info = 'tt' and meta.info = 'tt'",
  1139. ?assertMatch(
  1140. {ok, #{
  1141. <<"cmd">> := #{<<"info">> := <<"tt">>},
  1142. <<"meta">> := #{<<"info">> := <<"tt">>}
  1143. }},
  1144. emqx_rule_sqltester:test(
  1145. #{
  1146. sql => Sql4,
  1147. context =>
  1148. #{
  1149. payload =>
  1150. <<"{\"cmd\": {\"info\":\"tt\"}}">>,
  1151. topic => <<"t/a">>
  1152. }
  1153. }
  1154. )
  1155. ).
  1156. t_sqlselect_00(_Config) ->
  1157. %% Verify plus/subtract and unary_add_or_subtract
  1158. Sql =
  1159. "select 1-1 as a "
  1160. "from \"t/#\" ",
  1161. ?assertMatch(
  1162. {ok, #{<<"a">> := 0}},
  1163. emqx_rule_sqltester:test(
  1164. #{
  1165. sql => Sql,
  1166. context =>
  1167. #{
  1168. payload => <<"">>,
  1169. topic => <<"t/a">>
  1170. }
  1171. }
  1172. )
  1173. ),
  1174. Sql1 =
  1175. "select -1 + 1 as a "
  1176. "from \"t/#\" ",
  1177. ?assertMatch(
  1178. {ok, #{<<"a">> := 0}},
  1179. emqx_rule_sqltester:test(
  1180. #{
  1181. sql => Sql1,
  1182. context =>
  1183. #{
  1184. payload => <<"">>,
  1185. topic => <<"t/a">>
  1186. }
  1187. }
  1188. )
  1189. ),
  1190. Sql2 =
  1191. "select 1 + 1 as a "
  1192. "from \"t/#\" ",
  1193. ?assertMatch(
  1194. {ok, #{<<"a">> := 2}},
  1195. emqx_rule_sqltester:test(
  1196. #{
  1197. sql => Sql2,
  1198. context =>
  1199. #{
  1200. payload => <<"">>,
  1201. topic => <<"t/a">>
  1202. }
  1203. }
  1204. )
  1205. ),
  1206. Sql3 =
  1207. "select +1 as a "
  1208. "from \"t/#\" ",
  1209. ?assertMatch(
  1210. {ok, #{<<"a">> := 1}},
  1211. emqx_rule_sqltester:test(
  1212. #{
  1213. sql => Sql3,
  1214. context =>
  1215. #{
  1216. payload => <<"">>,
  1217. topic => <<"t/a">>
  1218. }
  1219. }
  1220. )
  1221. ).
  1222. t_sqlselect_with_3rd_party_impl(_Config) ->
  1223. Sql =
  1224. "select * from \"t/#\" where emqx_rule_funcs_demo.is_my_topic(topic)",
  1225. T = fun(Topic) ->
  1226. emqx_rule_sqltester:test(
  1227. #{
  1228. sql => Sql,
  1229. context =>
  1230. #{
  1231. payload => #{<<"what">> => 0},
  1232. topic => Topic
  1233. }
  1234. }
  1235. )
  1236. end,
  1237. ?assertMatch({ok, _}, T(<<"t/2/3/4/5">>)),
  1238. ?assertMatch({error, nomatch}, T(<<"t/1">>)).
  1239. t_sqlselect_with_3rd_party_impl2(_Config) ->
  1240. Sql = fun(N) ->
  1241. "select emqx_rule_funcs_demo.duplicate_payload(payload," ++ integer_to_list(N) ++
  1242. ") as payload_list from \"t/#\""
  1243. end,
  1244. T = fun(Payload, N) ->
  1245. emqx_rule_sqltester:test(
  1246. #{
  1247. sql => Sql(N),
  1248. context =>
  1249. #{
  1250. payload => Payload,
  1251. topic => <<"t/a">>
  1252. }
  1253. }
  1254. )
  1255. end,
  1256. ?assertMatch({ok, #{<<"payload_list">> := [_, _]}}, T(<<"payload1">>, 2)),
  1257. ?assertMatch({ok, #{<<"payload_list">> := [_, _, _]}}, T(<<"payload1">>, 3)),
  1258. %% crash
  1259. ?assertMatch({error, {select_and_transform_error, _}}, T(<<"payload1">>, 4)).
  1260. t_sqlselect_with_3rd_party_funcs_unknown(_Config) ->
  1261. Sql = "select emqx_rule_funcs_demo_no_such_module.foo(payload) from \"t/#\"",
  1262. ?assertMatch(
  1263. {error,
  1264. {select_and_transform_error,
  1265. {throw, #{reason := sql_function_provider_module_not_loaded}, _}}},
  1266. emqx_rule_sqltester:test(
  1267. #{
  1268. sql => Sql,
  1269. context => #{payload => <<"a">>, topic => <<"t/a">>}
  1270. }
  1271. )
  1272. ).
  1273. t_sqlselect_001(_Config) ->
  1274. %% Verify that the jq function can be called from SQL
  1275. Sql =
  1276. "select jq('.what + .what', payload) as ans "
  1277. "from \"t/#\" ",
  1278. ?assertMatch(
  1279. {ok, #{<<"ans">> := [8]}},
  1280. emqx_rule_sqltester:test(
  1281. #{
  1282. sql => Sql,
  1283. context =>
  1284. #{
  1285. payload => #{<<"what">> => 4},
  1286. topic => <<"t/a">>
  1287. }
  1288. }
  1289. )
  1290. ),
  1291. Sql2 =
  1292. "SELECT jq('.a|.[]', "
  1293. "'{\"a\": [{\"b\": 1}, {\"b\": 2}, {\"b\": 3}]}') "
  1294. "as jq_action, "
  1295. " jq_action[1].b as first_b from \"t/#\" ",
  1296. ?assertMatch(
  1297. {ok, #{<<"first_b">> := 1}},
  1298. emqx_rule_sqltester:test(
  1299. #{
  1300. sql => Sql2,
  1301. context =>
  1302. #{
  1303. payload => #{<<"what">> => 4},
  1304. topic => <<"t/a">>
  1305. }
  1306. }
  1307. )
  1308. ).
  1309. t_sqlselect_002(_Config) ->
  1310. %% Verify that the div and mod can be used both as infix operations and as
  1311. %% function calls
  1312. Sql =
  1313. ""
  1314. "select 2 mod 2 as mod1,\n"
  1315. " mod(3, 2) as mod2,\n"
  1316. " 4 div 2 as div1,\n"
  1317. " div(7, 2) as div2\n"
  1318. " from \"t/#\" "
  1319. "",
  1320. ?assertMatch(
  1321. {ok, #{
  1322. <<"mod1">> := 0,
  1323. <<"mod2">> := 1,
  1324. <<"div1">> := 2,
  1325. <<"div2">> := 3
  1326. }},
  1327. emqx_rule_sqltester:test(
  1328. #{
  1329. sql => Sql,
  1330. context =>
  1331. #{
  1332. payload => #{<<"what">> => 4},
  1333. topic => <<"t/a">>
  1334. }
  1335. }
  1336. )
  1337. ).
  1338. t_sqlselect_inject_props(_Config) ->
  1339. SQL =
  1340. "SELECT json_decode(payload) as p, payload, "
  1341. "map_put('inject_key', 'inject_val', user_properties) as user_properties "
  1342. "FROM \"t3/#\", \"t1\" "
  1343. "WHERE p.x = 1",
  1344. Repub = republish_action(<<"t2">>),
  1345. {ok, TopicRule1} = emqx_rule_engine:create_rule(
  1346. #{
  1347. sql => SQL,
  1348. id => ?TMP_RULEID,
  1349. actions => [Repub]
  1350. }
  1351. ),
  1352. {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
  1353. {ok, _} = emqtt:connect(Client),
  1354. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1355. emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
  1356. receive
  1357. {publish, #{topic := T, payload := Payload, properties := Props}} ->
  1358. ?assertEqual(user_properties(#{<<"inject_key">> => <<"inject_val">>}), Props),
  1359. ?assertEqual(<<"t2">>, T),
  1360. ?assertEqual(<<"{\"x\":1}">>, Payload)
  1361. after 2000 ->
  1362. ct:fail(wait_for_t2)
  1363. end,
  1364. emqtt:stop(Client),
  1365. delete_rule(TopicRule1).
  1366. t_sqlselect_01(_Config) ->
  1367. SQL =
  1368. "SELECT json_decode(payload) as p, payload "
  1369. "FROM \"t3/#\", \"t1\" "
  1370. "WHERE p.x = 1",
  1371. Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>),
  1372. {ok, TopicRule1} = emqx_rule_engine:create_rule(
  1373. #{
  1374. sql => SQL,
  1375. id => ?TMP_RULEID,
  1376. actions => [Repub]
  1377. }
  1378. ),
  1379. Props = user_properties(#{<<"mykey">> => <<"myval">>}),
  1380. {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
  1381. {ok, _} = emqtt:connect(Client),
  1382. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1383. emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
  1384. receive
  1385. {publish, #{topic := T, payload := Payload}} ->
  1386. ?assertEqual(<<"t2">>, T),
  1387. ?assertEqual(<<"{\"x\":1}">>, Payload)
  1388. after 2000 ->
  1389. ct:fail(wait_for_t2)
  1390. end,
  1391. emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]),
  1392. receive
  1393. {publish, #{topic := <<"t2">>, payload := _}} ->
  1394. ct:fail(unexpected_t2)
  1395. after 2000 ->
  1396. ok
  1397. end,
  1398. emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
  1399. receive
  1400. {publish, #{topic := T3, payload := Payload3, properties := Props2}} ->
  1401. ?assertEqual(Props, Props2),
  1402. ?assertEqual(<<"t2">>, T3),
  1403. ?assertEqual(<<"{\"x\":1}">>, Payload3)
  1404. after 2000 ->
  1405. ct:fail(wait_for_t3)
  1406. end,
  1407. emqtt:stop(Client),
  1408. delete_rule(TopicRule1).
  1409. t_sqlselect_02(_Config) ->
  1410. SQL =
  1411. "SELECT * "
  1412. "FROM \"t3/#\", \"t1\" "
  1413. "WHERE payload.x = 1",
  1414. Repub = republish_action(<<"t2">>),
  1415. {ok, TopicRule1} = emqx_rule_engine:create_rule(
  1416. #{
  1417. sql => SQL,
  1418. id => ?TMP_RULEID,
  1419. actions => [Repub]
  1420. }
  1421. ),
  1422. {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
  1423. {ok, _} = emqtt:connect(Client),
  1424. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1425. emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
  1426. ct:sleep(100),
  1427. receive
  1428. {publish, #{topic := T, payload := Payload}} ->
  1429. ?assertEqual(<<"t2">>, T),
  1430. ?assertEqual(<<"{\"x\":1}">>, Payload)
  1431. after 1000 ->
  1432. ct:fail(wait_for_t2)
  1433. end,
  1434. emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
  1435. receive
  1436. {publish, #{topic := <<"t2">>, payload := Payload0}} ->
  1437. ct:fail({unexpected_t2, Payload0})
  1438. after 1000 ->
  1439. ok
  1440. end,
  1441. emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
  1442. receive
  1443. {publish, #{topic := T3, payload := Payload3}} ->
  1444. ?assertEqual(<<"t2">>, T3),
  1445. ?assertEqual(<<"{\"x\":1}">>, Payload3)
  1446. after 1000 ->
  1447. ct:fail(wait_for_t2)
  1448. end,
  1449. emqtt:stop(Client),
  1450. delete_rule(TopicRule1).
  1451. t_sqlselect_03(_Config) ->
  1452. init_events_counters(),
  1453. SQL = "SELECT * FROM \"t/r\" ",
  1454. Repub = republish_action(
  1455. <<"t/republish">>,
  1456. <<"${.}">>,
  1457. <<"${pub_props.'User-Property'}">>,
  1458. #{
  1459. <<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>,
  1460. <<"Message-Expiry-Interval">> => <<"${.payload.mei}">>,
  1461. <<"Content-Type">> => <<"${.payload.ct}">>,
  1462. <<"Response-Topic">> => <<"${.payload.rt}">>,
  1463. <<"Correlation-Data">> => <<"${.payload.cd}">>
  1464. }
  1465. ),
  1466. RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}),
  1467. ct:pal("republish action raw:\n ~p", [RepubRaw]),
  1468. RuleRaw = #{
  1469. <<"sql">> => SQL,
  1470. <<"actions">> => [RepubRaw]
  1471. },
  1472. {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}),
  1473. on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end),
  1474. %% to check what republish is actually producing without loss of information
  1475. SQL1 = "select * from \"t/republish\" ",
  1476. RuleId0 = ?TMP_RULEID,
  1477. RuleId1 = <<RuleId0/binary, "2">>,
  1478. {ok, _} = emqx_rule_engine:create_rule(
  1479. #{
  1480. sql => SQL1,
  1481. id => RuleId1,
  1482. actions => [
  1483. #{
  1484. function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
  1485. args => #{}
  1486. }
  1487. ]
  1488. }
  1489. ),
  1490. on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end),
  1491. UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}),
  1492. Payload =
  1493. emqx_utils_json:encode(
  1494. #{
  1495. pfi => 1,
  1496. mei => 2,
  1497. ct => <<"3">>,
  1498. rt => <<"4">>,
  1499. cd => <<"5">>
  1500. }
  1501. ),
  1502. {ok, Client} = emqtt:start_link([
  1503. {username, <<"emqx">>},
  1504. {proto_ver, v5},
  1505. {properties, #{'Topic-Alias-Maximum' => 100}}
  1506. ]),
  1507. on_exit(fun() -> emqtt:stop(Client) end),
  1508. {ok, _} = emqtt:connect(Client),
  1509. {ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0),
  1510. PubProps = #{'User-Property' => UserProps},
  1511. ExpectedMQTTProps0 = #{
  1512. 'Payload-Format-Indicator' => 1,
  1513. 'Message-Expiry-Interval' => 2,
  1514. 'Content-Type' => <<"3">>,
  1515. 'Response-Topic' => <<"4">>,
  1516. 'Correlation-Data' => <<"5">>,
  1517. %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
  1518. %% so the channel controls those aliases on its own, starting from 1.
  1519. 'Topic-Alias' => 1,
  1520. 'User-Property' => UserProps
  1521. },
  1522. emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]),
  1523. receive
  1524. {publish, #{topic := <<"t/republish">>, properties := Props1}} ->
  1525. ?assertEqual(ExpectedMQTTProps0, Props1),
  1526. ok
  1527. after 2000 ->
  1528. ct:pal("mailbox:\n ~p", [?drainMailbox()]),
  1529. ct:fail("message not republished (l. ~b)", [?LINE])
  1530. end,
  1531. ExpectedMQTTProps1 = #{
  1532. 'Payload-Format-Indicator' => 1,
  1533. 'Message-Expiry-Interval' => 2,
  1534. 'Content-Type' => <<"3">>,
  1535. 'Response-Topic' => <<"4">>,
  1536. 'Correlation-Data' => <<"5">>,
  1537. 'User-Property' => maps:from_list(UserProps),
  1538. 'User-Property-Pairs' => [
  1539. #{key => K, value => V}
  1540. || {K, V} <- UserProps
  1541. ]
  1542. },
  1543. ?assertMatch(
  1544. [
  1545. {'message.publish', #{
  1546. topic := <<"t/republish">>,
  1547. pub_props := ExpectedMQTTProps1
  1548. }}
  1549. ],
  1550. ets:lookup(events_record_tab, 'message.publish'),
  1551. #{expected_props => ExpectedMQTTProps1}
  1552. ),
  1553. ct:pal("testing payload that is not a json object"),
  1554. emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]),
  1555. ExpectedMQTTProps2 = #{
  1556. 'Content-Type' => <<"undefined">>,
  1557. 'Correlation-Data' => <<"undefined">>,
  1558. 'Response-Topic' => <<"undefined">>,
  1559. %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
  1560. %% so the channel controls those aliases on its own, starting from 1.
  1561. 'Topic-Alias' => 1,
  1562. 'User-Property' => UserProps
  1563. },
  1564. receive
  1565. {publish, #{topic := T1, properties := Props2}} ->
  1566. ?assertEqual(ExpectedMQTTProps2, Props2),
  1567. %% empty this time, due to topic alias set before
  1568. ?assertEqual(<<>>, T1),
  1569. ok
  1570. after 2000 ->
  1571. ct:pal("mailbox:\n ~p", [?drainMailbox()]),
  1572. ct:fail("message not republished (l. ~b)", [?LINE])
  1573. end,
  1574. ct:pal("testing payload with some uncoercible keys"),
  1575. ets:delete_all_objects(events_record_tab),
  1576. Payload1 =
  1577. emqx_utils_json:encode(#{
  1578. pfi => <<"bad_value1">>,
  1579. mei => <<"bad_value2">>,
  1580. ct => <<"some_value3">>,
  1581. rt => <<"some_value4">>,
  1582. cd => <<"some_value5">>
  1583. }),
  1584. emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]),
  1585. ExpectedMQTTProps3 = #{
  1586. %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
  1587. %% so the channel controls those aliases on its own, starting from 1.
  1588. 'Topic-Alias' => 1,
  1589. 'Content-Type' => <<"some_value3">>,
  1590. 'Response-Topic' => <<"some_value4">>,
  1591. 'Correlation-Data' => <<"some_value5">>,
  1592. 'User-Property' => UserProps
  1593. },
  1594. receive
  1595. {publish, #{topic := T2, properties := Props3}} ->
  1596. ?assertEqual(ExpectedMQTTProps3, Props3),
  1597. %% empty this time, due to topic alias set before
  1598. ?assertEqual(<<>>, T2),
  1599. ok
  1600. after 2000 ->
  1601. ct:pal("mailbox:\n ~p", [?drainMailbox()]),
  1602. ct:fail("message not republished (l. ~b)", [?LINE])
  1603. end,
  1604. ExpectedMQTTProps4 = #{
  1605. 'Content-Type' => <<"some_value3">>,
  1606. 'Response-Topic' => <<"some_value4">>,
  1607. 'Correlation-Data' => <<"some_value5">>,
  1608. 'User-Property' => maps:from_list(UserProps),
  1609. 'User-Property-Pairs' => [
  1610. #{key => K, value => V}
  1611. || {K, V} <- UserProps
  1612. ]
  1613. },
  1614. ?assertMatch(
  1615. [
  1616. {'message.publish', #{
  1617. topic := <<"t/republish">>,
  1618. pub_props := ExpectedMQTTProps4
  1619. }}
  1620. ],
  1621. ets:lookup(events_record_tab, 'message.publish'),
  1622. #{expected_props => ExpectedMQTTProps4}
  1623. ),
  1624. ct:pal("testing a payload with a more complex placeholder"),
  1625. Repub1 = republish_action(
  1626. <<"t/republish">>,
  1627. <<"${.}">>,
  1628. <<"${pub_props.'User-Property'}">>,
  1629. #{
  1630. %% Note: `Payload-Format-Indicator' is capped at 225.
  1631. <<"Payload-Format-Indicator">> => <<"1${.payload.pfi}3">>,
  1632. <<"Message-Expiry-Interval">> => <<"9${.payload.mei}6">>
  1633. }
  1634. ),
  1635. RepubRaw1 = emqx_utils_maps:binary_key_map(Repub1#{function => <<"republish">>}),
  1636. ct:pal("republish action raw:\n ~p", [RepubRaw1]),
  1637. RuleRaw1 = #{
  1638. <<"sql">> => SQL,
  1639. <<"actions">> => [RepubRaw1]
  1640. },
  1641. {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw1, #{}),
  1642. Payload2 =
  1643. emqx_utils_json:encode(#{
  1644. pfi => <<"2">>,
  1645. mei => <<"87">>
  1646. }),
  1647. emqtt:publish(Client, <<"t/r">>, PubProps, Payload2, [{qos, 0}]),
  1648. ExpectedMQTTProps5 = #{
  1649. %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
  1650. %% serializing nor parsing...
  1651. 'Payload-Format-Indicator' => 123,
  1652. 'Message-Expiry-Interval' => 9876,
  1653. %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
  1654. %% so the channel controls those aliases on its own, starting from 1.
  1655. 'Topic-Alias' => 1,
  1656. 'User-Property' => UserProps
  1657. },
  1658. receive
  1659. {publish, #{topic := T3, properties := Props4}} ->
  1660. ?assertEqual(ExpectedMQTTProps5, Props4),
  1661. %% empty this time, due to topic alias set before
  1662. ?assertEqual(<<>>, T3),
  1663. ok
  1664. after 2000 ->
  1665. ct:pal("mailbox:\n ~p", [?drainMailbox()]),
  1666. ct:fail("message not republished (l. ~b)", [?LINE])
  1667. end,
  1668. ct:pal("testing payload-format-indicator cap"),
  1669. Payload3 =
  1670. emqx_utils_json:encode(#{
  1671. pfi => <<"999999">>,
  1672. mei => <<"87">>
  1673. }),
  1674. emqtt:publish(Client, <<"t/r">>, PubProps, Payload3, [{qos, 0}]),
  1675. ExpectedMQTTProps6 = #{
  1676. %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when
  1677. %% serializing nor parsing...
  1678. %% Note: PFI is capped at 16#FF
  1679. 'Payload-Format-Indicator' => 16#FF band 19999993,
  1680. 'Message-Expiry-Interval' => 9876,
  1681. %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
  1682. %% so the channel controls those aliases on its own, starting from 1.
  1683. 'Topic-Alias' => 1,
  1684. 'User-Property' => UserProps
  1685. },
  1686. receive
  1687. {publish, #{topic := T4, properties := Props5}} ->
  1688. ?assertEqual(ExpectedMQTTProps6, Props5),
  1689. %% empty this time, due to topic alias set before
  1690. ?assertEqual(<<>>, T4),
  1691. ok
  1692. after 2000 ->
  1693. ct:pal("mailbox:\n ~p", [?drainMailbox()]),
  1694. ct:fail("message not republished (l. ~b)", [?LINE])
  1695. end,
  1696. ok.
  1697. t_sqlselect_1(_Config) ->
  1698. SQL =
  1699. "SELECT json_decode(payload) as p, payload "
  1700. "FROM \"t1\" "
  1701. "WHERE p.x = 1 and p.y = 2",
  1702. Repub = republish_action(<<"t2">>),
  1703. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1704. #{
  1705. sql => SQL,
  1706. id => ?TMP_RULEID,
  1707. actions => [Repub]
  1708. }
  1709. ),
  1710. {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
  1711. {ok, _} = emqtt:connect(Client),
  1712. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1713. emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
  1714. receive
  1715. {publish, #{topic := T, payload := Payload}} ->
  1716. ?assertEqual(<<"t2">>, T),
  1717. ?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
  1718. after 2000 ->
  1719. ct:fail(wait_for_t2)
  1720. end,
  1721. emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
  1722. receive
  1723. {publish, #{topic := <<"t2">>, payload := _}} ->
  1724. ct:fail(unexpected_t2)
  1725. after 1000 ->
  1726. ok
  1727. end,
  1728. emqtt:stop(Client),
  1729. delete_rule(TopicRule).
  1730. t_sqlselect_2(_Config) ->
  1731. %% recursively republish to t2
  1732. SQL = "SELECT * FROM \"t2\" ",
  1733. Repub = republish_action(<<"t2">>),
  1734. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1735. #{
  1736. sql => SQL,
  1737. id => ?TMP_RULEID,
  1738. actions => [Repub]
  1739. }
  1740. ),
  1741. {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
  1742. {ok, _} = emqtt:connect(Client),
  1743. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1744. emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0),
  1745. Fun = fun() ->
  1746. receive
  1747. {publish, #{topic := <<"t2">>, payload := _}} ->
  1748. received_t2
  1749. after 500 ->
  1750. received_nothing
  1751. end
  1752. end,
  1753. received_t2 = Fun(),
  1754. received_t2 = Fun(),
  1755. received_nothing = Fun(),
  1756. emqtt:stop(Client),
  1757. delete_rule(TopicRule).
  1758. t_sqlselect_3(_Config) ->
  1759. %% republish the client.connected msg
  1760. SQL =
  1761. "SELECT * "
  1762. "FROM \"$events/client_connected\" "
  1763. "WHERE username = 'emqx1'",
  1764. Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
  1765. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1766. #{
  1767. sql => SQL,
  1768. id => ?TMP_RULEID,
  1769. actions => [Repub]
  1770. }
  1771. ),
  1772. {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
  1773. {ok, _} = emqtt:connect(Client),
  1774. {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
  1775. {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
  1776. {ok, _} = emqtt:connect(Client1),
  1777. receive
  1778. {publish, #{topic := T, payload := Payload}} ->
  1779. ?assertEqual(<<"t2">>, T),
  1780. ?assertEqual(<<"clientid=c_emqx1">>, Payload)
  1781. after 2000 ->
  1782. ct:fail(wait_for_t2)
  1783. end,
  1784. emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
  1785. receive
  1786. {publish, #{topic := <<"t2">>, payload := _}} ->
  1787. ct:fail(unexpected_t2)
  1788. after 1000 ->
  1789. ok
  1790. end,
  1791. emqtt:stop(Client),
  1792. delete_rule(TopicRule).
  1793. t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
  1794. %% republish the client.connected msg
  1795. Topic = <<"foo/bar/1">>,
  1796. SQL = <<
  1797. "SELECT clientid "
  1798. "FROM \"$events/message_dropped\" "
  1799. >>,
  1800. %"WHERE topic = \"", Topic/binary, "\"">>,
  1801. Repub = republish_action(
  1802. <<"t2">>,
  1803. <<"clientid=${clientid}">>,
  1804. <<"${pub_props.'User-Property'}">>
  1805. ),
  1806. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1807. #{
  1808. sql => SQL,
  1809. id => ?TMP_RULEID,
  1810. actions => [Repub]
  1811. }
  1812. ),
  1813. {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
  1814. {ok, _} = emqtt:connect(Client1),
  1815. {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
  1816. {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
  1817. {ok, _} = emqtt:connect(Client2),
  1818. Props = user_properties(#{<<"mykey">> => <<"111111">>}),
  1819. emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
  1820. receive
  1821. {publish, #{topic := T, payload := Payload, properties := Props1}} ->
  1822. ?assertEqual(Props1, Props),
  1823. ?assertEqual(<<"t2">>, T),
  1824. ?assertEqual(<<"clientid=pub-02">>, Payload)
  1825. after 2000 ->
  1826. ct:fail(wait_for_t2)
  1827. end,
  1828. emqtt:stop(Client2),
  1829. emqtt:stop(Client1),
  1830. delete_rule(TopicRule).
  1831. t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
  1832. %% republish the client.connected msg
  1833. Topic = <<"foo/bar/1">>,
  1834. SQL = <<
  1835. "SELECT clientid, pub_props.'User-Property' as user_properties "
  1836. "FROM \"$events/message_dropped\" "
  1837. >>,
  1838. %"WHERE topic = \"", Topic/binary, "\"">>,
  1839. Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
  1840. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1841. #{
  1842. sql => SQL,
  1843. id => ?TMP_RULEID,
  1844. actions => [Repub]
  1845. }
  1846. ),
  1847. {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
  1848. {ok, _} = emqtt:connect(Client1),
  1849. {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
  1850. {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
  1851. {ok, _} = emqtt:connect(Client2),
  1852. Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
  1853. emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
  1854. receive
  1855. {publish, #{topic := T, payload := Payload, properties := Props1}} ->
  1856. ?assertEqual(Props1, Props),
  1857. ?assertEqual(<<"t2">>, T),
  1858. ?assertEqual(<<"clientid=pub-02">>, Payload)
  1859. after 2000 ->
  1860. ct:fail(wait_for_t2)
  1861. end,
  1862. emqtt:stop(Client2),
  1863. emqtt:stop(Client1),
  1864. delete_rule(TopicRule).
  1865. t_sqlselect_as_put(_Config) ->
  1866. %% Verify SELECT with 'AS' to update the payload
  1867. Sql =
  1868. "select payload, "
  1869. "'STEVE' as payload.data[1].name "
  1870. "from \"t/#\" ",
  1871. PayloadMap = #{
  1872. <<"f1">> => <<"f1">>,
  1873. <<"f2">> => <<"f2">>,
  1874. <<"data">> => [
  1875. #{<<"name">> => <<"n1">>, <<"idx">> => 1},
  1876. #{<<"name">> => <<"n2">>, <<"idx">> => 2}
  1877. ]
  1878. },
  1879. PayloadBin = emqx_utils_json:encode(PayloadMap),
  1880. SqlResult = emqx_rule_sqltester:test(
  1881. #{
  1882. sql => Sql,
  1883. context =>
  1884. #{
  1885. payload => PayloadBin,
  1886. topic => <<"t/a">>
  1887. }
  1888. }
  1889. ),
  1890. ?assertMatch({ok, #{<<"payload">> := _}}, SqlResult),
  1891. {ok, #{<<"payload">> := PayloadMap2}} = SqlResult,
  1892. ?assertMatch(
  1893. #{
  1894. <<"f1">> := <<"f1">>,
  1895. <<"f2">> := <<"f2">>,
  1896. <<"data">> := [
  1897. #{<<"name">> := <<"STEVE">>, <<"idx">> := 1},
  1898. #{<<"name">> := <<"n2">>, <<"idx">> := 2}
  1899. ]
  1900. },
  1901. PayloadMap2
  1902. ).
  1903. t_sqlselect_missing_template_vars_render_as_undefined(_Config) ->
  1904. SQL = <<"SELECT * FROM \"$events/client_connected\"">>,
  1905. Repub = republish_action(<<"t2">>, <<"${clientid}:${missing.var}">>),
  1906. {ok, TopicRule} = emqx_rule_engine:create_rule(
  1907. #{
  1908. sql => SQL,
  1909. id => ?TMP_RULEID,
  1910. actions => [Repub]
  1911. }
  1912. ),
  1913. {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}]),
  1914. {ok, _} = emqtt:connect(Client1),
  1915. {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>),
  1916. {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}]),
  1917. {ok, _} = emqtt:connect(Client2),
  1918. emqtt:publish(Client2, <<"foo/bar/1">>, <<>>),
  1919. receive
  1920. {publish, Msg} ->
  1921. ?assertMatch(#{topic := <<"t2">>, payload := <<"pub-02:undefined">>}, Msg)
  1922. after 2000 ->
  1923. ct:fail(wait_for_t2)
  1924. end,
  1925. emqtt:stop(Client2),
  1926. emqtt:stop(Client1),
  1927. delete_rule(TopicRule).
  1928. t_sqlparse_event_1(_Config) ->
  1929. Sql =
  1930. "select topic as tp "
  1931. "from \"$events/session_subscribed\" ",
  1932. ?assertMatch(
  1933. {ok, #{<<"tp">> := <<"t/tt">>}},
  1934. emqx_rule_sqltester:test(
  1935. #{
  1936. sql => Sql,
  1937. context => #{
  1938. topic => <<"t/tt">>,
  1939. event => 'session.subscribed'
  1940. }
  1941. }
  1942. )
  1943. ).
  1944. t_sqlparse_event_2(_Config) ->
  1945. Sql =
  1946. "select clientid "
  1947. "from \"$events/client_connected\" ",
  1948. ?assertMatch(
  1949. {ok, #{<<"clientid">> := <<"abc">>}},
  1950. emqx_rule_sqltester:test(
  1951. #{
  1952. sql => Sql,
  1953. context => #{
  1954. clientid => <<"abc">>,
  1955. event => 'client.connected'
  1956. }
  1957. }
  1958. )
  1959. ).
  1960. t_sqlparse_event_3(_Config) ->
  1961. Sql =
  1962. "select clientid, topic as tp "
  1963. "from \"t/tt\", \"$events/client_connected\" ",
  1964. ?assertMatch(
  1965. {ok, #{<<"clientid">> := <<"abc">>, <<"tp">> := <<"t/tt">>}},
  1966. emqx_rule_sqltester:test(
  1967. #{
  1968. sql => Sql,
  1969. context => #{clientid => <<"abc">>, topic => <<"t/tt">>}
  1970. }
  1971. )
  1972. ).
  1973. t_sqlparse_foreach_1(_Config) ->
  1974. %% Verify foreach with and without 'AS'
  1975. Sql =
  1976. "foreach payload.sensors as s "
  1977. "from \"t/#\" ",
  1978. ?assertMatch(
  1979. {ok, [#{<<"s">> := 1}, #{<<"s">> := 2}]},
  1980. emqx_rule_sqltester:test(
  1981. #{
  1982. sql => Sql,
  1983. context => #{
  1984. payload => <<"{\"sensors\": [1, 2]}">>,
  1985. topic => <<"t/a">>
  1986. }
  1987. }
  1988. )
  1989. ),
  1990. Sql2 =
  1991. "foreach payload.sensors "
  1992. "from \"t/#\" ",
  1993. ?assertMatch(
  1994. {ok, [#{item := 1}, #{item := 2}]},
  1995. emqx_rule_sqltester:test(
  1996. #{
  1997. sql => Sql2,
  1998. context => #{
  1999. payload => <<"{\"sensors\": [1, 2]}">>,
  2000. topic => <<"t/a">>
  2001. }
  2002. }
  2003. )
  2004. ),
  2005. Sql3 =
  2006. "foreach payload.sensors "
  2007. "from \"t/#\" ",
  2008. ?assertMatch(
  2009. {ok, [
  2010. #{item := #{<<"cmd">> := <<"1">>}, clientid := <<"c_a">>},
  2011. #{item := #{<<"cmd">> := <<"2">>, <<"name">> := <<"ct">>}, clientid := <<"c_a">>}
  2012. ]},
  2013. emqx_rule_sqltester:test(
  2014. #{
  2015. sql => Sql3,
  2016. context => #{
  2017. payload =>
  2018. <<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\",\"name\":\"ct\"}]}">>,
  2019. clientid => <<"c_a">>,
  2020. topic => <<"t/a">>
  2021. }
  2022. }
  2023. )
  2024. ),
  2025. Sql4 =
  2026. "foreach payload.sensors "
  2027. "from \"t/#\" ",
  2028. {ok, [
  2029. #{metadata := #{rule_id := TRuleId}},
  2030. #{metadata := #{rule_id := TRuleId}}
  2031. ]} =
  2032. emqx_rule_sqltester:test(
  2033. #{
  2034. sql => Sql4,
  2035. context => #{
  2036. payload => <<"{\"sensors\": [1, 2]}">>,
  2037. topic => <<"t/a">>
  2038. }
  2039. }
  2040. ),
  2041. Sql5 =
  2042. "foreach payload.sensors "
  2043. "from \"t/#\" ",
  2044. {ok, [
  2045. #{payload := #{<<"sensors">> := _}},
  2046. #{payload := #{<<"sensors">> := _}}
  2047. ]} =
  2048. emqx_rule_sqltester:test(
  2049. #{
  2050. sql => Sql5,
  2051. context => #{
  2052. payload => <<"{\"sensors\": [1, 2]}">>,
  2053. topic => <<"t/a">>
  2054. }
  2055. }
  2056. ),
  2057. try
  2058. meck:new(emqx_rule_runtime, [non_strict, passthrough]),
  2059. meck:expect(
  2060. emqx_rule_runtime,
  2061. apply_rule,
  2062. fun(Rule, #{payload := Payload} = Columns, Env) ->
  2063. Columns2 = maps:put(<<"payload">>, Payload, maps:without([payload], Columns)),
  2064. meck:passthrough([Rule, Columns2, Env])
  2065. end
  2066. ),
  2067. Sql6 =
  2068. "foreach payload.sensors "
  2069. "from \"t/#\" ",
  2070. {ok, [
  2071. #{<<"payload">> := #{<<"sensors">> := _}},
  2072. #{<<"payload">> := #{<<"sensors">> := _}}
  2073. ]} =
  2074. emqx_rule_sqltester:test(
  2075. #{
  2076. sql => Sql6,
  2077. context => #{
  2078. <<"payload">> => <<"{\"sensors\": [1, 2]}">>,
  2079. topic => <<"t/a">>
  2080. }
  2081. }
  2082. ),
  2083. Sql7 =
  2084. "foreach payload.sensors "
  2085. "from \"t/#\" ",
  2086. ?assertNotMatch(
  2087. {ok, [
  2088. #{<<"payload">> := _, payload := _},
  2089. #{<<"payload">> := _, payload := _}
  2090. ]},
  2091. emqx_rule_sqltester:test(
  2092. #{
  2093. sql => Sql7,
  2094. context => #{
  2095. <<"payload">> => <<"{\"sensors\": [1, 2]}">>,
  2096. topic => <<"t/a">>
  2097. }
  2098. }
  2099. )
  2100. )
  2101. after
  2102. meck:unload(emqx_rule_runtime)
  2103. end,
  2104. ?assert(is_binary(TRuleId)).
  2105. t_sqlparse_foreach_2(_Config) ->
  2106. %% Verify foreach-do with and without 'AS'
  2107. Sql =
  2108. "foreach payload.sensors as s "
  2109. "do s.cmd as msg_type "
  2110. "from \"t/#\" ",
  2111. ?assertMatch(
  2112. {ok, [#{<<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
  2113. emqx_rule_sqltester:test(
  2114. #{
  2115. sql => Sql,
  2116. context =>
  2117. #{
  2118. payload =>
  2119. <<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
  2120. topic => <<"t/a">>
  2121. }
  2122. }
  2123. )
  2124. ),
  2125. Sql2 =
  2126. "foreach payload.sensors "
  2127. "do item.cmd as msg_type "
  2128. "from \"t/#\" ",
  2129. ?assertMatch(
  2130. {ok, [#{<<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
  2131. emqx_rule_sqltester:test(
  2132. #{
  2133. sql => Sql2,
  2134. context =>
  2135. #{
  2136. payload =>
  2137. <<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
  2138. topic => <<"t/a">>
  2139. }
  2140. }
  2141. )
  2142. ),
  2143. Sql3 =
  2144. "foreach payload.sensors "
  2145. "do item as item "
  2146. "from \"t/#\" ",
  2147. ?assertMatch(
  2148. {ok, [#{<<"item">> := 1}, #{<<"item">> := 2}]},
  2149. emqx_rule_sqltester:test(
  2150. #{
  2151. sql => Sql3,
  2152. context =>
  2153. #{
  2154. payload =>
  2155. <<"{\"sensors\": [1, 2]}">>,
  2156. topic => <<"t/a">>
  2157. }
  2158. }
  2159. )
  2160. ).
  2161. t_sqlparse_foreach_3(_Config) ->
  2162. %% Verify foreach-incase with and without 'AS'
  2163. Sql =
  2164. "foreach payload.sensors as s "
  2165. "incase s.cmd != 1 "
  2166. "from \"t/#\" ",
  2167. ?assertMatch(
  2168. {ok, [
  2169. #{<<"s">> := #{<<"cmd">> := 2}},
  2170. #{<<"s">> := #{<<"cmd">> := 3}}
  2171. ]},
  2172. emqx_rule_sqltester:test(
  2173. #{
  2174. sql => Sql,
  2175. context =>
  2176. #{
  2177. payload =>
  2178. <<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
  2179. topic => <<"t/a">>
  2180. }
  2181. }
  2182. )
  2183. ),
  2184. Sql2 =
  2185. "foreach payload.sensors "
  2186. "incase item.cmd != 1 "
  2187. "from \"t/#\" ",
  2188. ?assertMatch(
  2189. {ok, [
  2190. #{item := #{<<"cmd">> := 2}},
  2191. #{item := #{<<"cmd">> := 3}}
  2192. ]},
  2193. emqx_rule_sqltester:test(
  2194. #{
  2195. sql => Sql2,
  2196. context =>
  2197. #{
  2198. payload =>
  2199. <<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
  2200. topic => <<"t/a">>
  2201. }
  2202. }
  2203. )
  2204. ).
  2205. t_sqlparse_foreach_4(_Config) ->
  2206. %% Verify foreach-do-incase
  2207. Sql =
  2208. "foreach payload.sensors as s "
  2209. "do s.cmd as msg_type, s.name as name "
  2210. "incase is_not_null(s.cmd) "
  2211. "from \"t/#\" ",
  2212. ?assertMatch(
  2213. {ok, [#{<<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
  2214. emqx_rule_sqltester:test(
  2215. #{
  2216. sql => Sql,
  2217. context =>
  2218. #{
  2219. payload =>
  2220. <<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
  2221. topic => <<"t/a">>
  2222. }
  2223. }
  2224. )
  2225. ),
  2226. ?assertMatch(
  2227. {ok, [#{<<"msg_type">> := <<"1">>, <<"name">> := <<"n1">>}, #{<<"msg_type">> := <<"2">>}]},
  2228. emqx_rule_sqltester:test(
  2229. #{
  2230. sql => Sql,
  2231. context =>
  2232. #{
  2233. payload =>
  2234. <<"{\"sensors\": [{\"cmd\":\"1\", \"name\":\"n1\"}, {\"cmd\":\"2\"}, {\"name\":\"n3\"}]}">>,
  2235. topic => <<"t/a">>
  2236. }
  2237. }
  2238. )
  2239. ),
  2240. ?assertMatch(
  2241. {ok, []},
  2242. emqx_rule_sqltester:test(
  2243. #{
  2244. sql => Sql,
  2245. context =>
  2246. #{
  2247. payload => <<"{\"sensors\": [1, 2]}">>,
  2248. topic => <<"t/a">>
  2249. }
  2250. }
  2251. )
  2252. ).
  2253. t_sqlparse_foreach_5(_Config) ->
  2254. %% Verify foreach on a empty-list or non-list variable
  2255. Sql =
  2256. "foreach payload.sensors as s "
  2257. "do s.cmd as msg_type, s.name as name "
  2258. "from \"t/#\" ",
  2259. ?assertMatch(
  2260. {ok, []},
  2261. emqx_rule_sqltester:test(
  2262. #{
  2263. sql => Sql,
  2264. context =>
  2265. #{
  2266. payload => <<"{\"sensors\": 1}">>,
  2267. topic => <<"t/a">>
  2268. }
  2269. }
  2270. )
  2271. ),
  2272. ?assertMatch(
  2273. {ok, []},
  2274. emqx_rule_sqltester:test(
  2275. #{
  2276. sql => Sql,
  2277. context =>
  2278. #{
  2279. payload => <<"{\"sensors\": []}">>,
  2280. topic => <<"t/a">>
  2281. }
  2282. }
  2283. )
  2284. ),
  2285. Sql2 =
  2286. "foreach payload.sensors "
  2287. "from \"t/#\" ",
  2288. ?assertMatch(
  2289. {ok, []},
  2290. emqx_rule_sqltester:test(
  2291. #{
  2292. sql => Sql2,
  2293. context =>
  2294. #{
  2295. payload => <<"{\"sensors\": 1}">>,
  2296. topic => <<"t/a">>
  2297. }
  2298. }
  2299. )
  2300. ).
  2301. t_sqlparse_foreach_6(_Config) ->
  2302. %% Verify foreach on a empty-list or non-list variable
  2303. Sql =
  2304. "foreach json_decode(payload) "
  2305. "do item.id as zid, timestamp as t "
  2306. "from \"t/#\" ",
  2307. {ok, Res} = emqx_rule_sqltester:test(
  2308. #{
  2309. sql => Sql,
  2310. context =>
  2311. #{
  2312. payload => <<"[{\"id\": 5},{\"id\": 15}]">>,
  2313. topic => <<"t/a">>
  2314. }
  2315. }
  2316. ),
  2317. [
  2318. #{<<"t">> := Ts1, <<"zid">> := Zid1},
  2319. #{<<"t">> := Ts2, <<"zid">> := Zid2}
  2320. ] = Res,
  2321. ?assertEqual(true, is_integer(Ts1)),
  2322. ?assertEqual(true, is_integer(Ts2)),
  2323. ?assert(Zid1 == 5 orelse Zid1 == 15),
  2324. ?assert(Zid2 == 5 orelse Zid2 == 15).
  2325. t_sqlparse_foreach_7(_Config) ->
  2326. %% Verify foreach-do-incase and cascaded AS
  2327. Sql =
  2328. "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
  2329. "do info.cmd as msg_type, info.name as name "
  2330. "incase is_not_null(info.cmd) "
  2331. "from \"t/#\" "
  2332. "where s.page = '2' ",
  2333. Payload = <<
  2334. "{\"sensors\": {\"page\": 2, \"collection\": "
  2335. "{\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }"
  2336. >>,
  2337. ?assertMatch(
  2338. {ok, [#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
  2339. emqx_rule_sqltester:test(
  2340. #{
  2341. sql => Sql,
  2342. context =>
  2343. #{
  2344. payload => Payload,
  2345. topic => <<"t/a">>
  2346. }
  2347. }
  2348. )
  2349. ),
  2350. Sql2 =
  2351. "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
  2352. "do info.cmd as msg_type, info.name as name "
  2353. "incase is_not_null(info.cmd) "
  2354. "from \"t/#\" "
  2355. "where s.page = '3' ",
  2356. ?assertMatch(
  2357. {error, nomatch},
  2358. emqx_rule_sqltester:test(
  2359. #{
  2360. sql => Sql2,
  2361. context =>
  2362. #{
  2363. payload => Payload,
  2364. topic => <<"t/a">>
  2365. }
  2366. }
  2367. )
  2368. ).
  2369. -define(COLL, #{<<"info">> := [<<"haha">>, #{<<"name">> := <<"cmd1">>, <<"cmd">> := <<"1">>}]}).
  2370. t_sqlparse_foreach_8(_Config) ->
  2371. %% Verify foreach-do-incase and cascaded AS
  2372. Sql =
  2373. "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
  2374. "do info.cmd as msg_type, info.name as name, s, c "
  2375. "incase is_map(info) "
  2376. "from \"t/#\" "
  2377. "where s.page = '2' ",
  2378. Payload = <<
  2379. "{\"sensors\": {\"page\": 2, \"collection\": "
  2380. "{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }"
  2381. >>,
  2382. ?assertMatch(
  2383. {ok, [
  2384. #{
  2385. <<"name">> := <<"cmd1">>,
  2386. <<"msg_type">> := <<"1">>,
  2387. <<"s">> := #{<<"page">> := 2, <<"collection">> := ?COLL},
  2388. <<"c">> := ?COLL
  2389. }
  2390. ]},
  2391. emqx_rule_sqltester:test(
  2392. #{
  2393. sql => Sql,
  2394. context =>
  2395. #{
  2396. payload => Payload,
  2397. topic => <<"t/a">>
  2398. }
  2399. }
  2400. )
  2401. ),
  2402. Sql3 =
  2403. "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, sublist(2,1,c.info) as info "
  2404. "do info.cmd as msg_type, info.name as name "
  2405. "from \"t/#\" "
  2406. "where s.page = '2' ",
  2407. [
  2408. ?assertMatch(
  2409. {ok, [#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
  2410. emqx_rule_sqltester:test(
  2411. #{
  2412. sql => SqlN,
  2413. context =>
  2414. #{
  2415. payload => Payload,
  2416. topic => <<"t/a">>
  2417. }
  2418. }
  2419. )
  2420. )
  2421. || SqlN <- [Sql3]
  2422. ].
  2423. t_sqlparse_foreach_9(_Config) ->
  2424. Sql1 =
  2425. "foreach json_decode(payload) as p "
  2426. "do p.ts as ts "
  2427. "from \"t/#\" ",
  2428. Context = #{
  2429. payload =>
  2430. emqx_utils_json:encode(
  2431. [
  2432. #{
  2433. <<"ts">> => 1451649600512,
  2434. <<"values">> =>
  2435. #{
  2436. <<"respiratoryrate">> => 20,
  2437. <<"heartrate">> => 130,
  2438. <<"systolic">> => 50
  2439. }
  2440. }
  2441. ]
  2442. ),
  2443. topic => <<"t/a">>
  2444. },
  2445. ?assertMatch(
  2446. {ok, [#{<<"ts">> := 1451649600512}]},
  2447. emqx_rule_sqltester:test(
  2448. #{
  2449. sql => Sql1,
  2450. context => Context
  2451. }
  2452. )
  2453. ),
  2454. %% doesn't work if we don't decode it first
  2455. Sql2 =
  2456. "foreach payload as p "
  2457. "do p.ts as ts "
  2458. "from \"t/#\" ",
  2459. ?assertMatch(
  2460. {ok, []},
  2461. emqx_rule_sqltester:test(
  2462. #{
  2463. sql => Sql2,
  2464. context => Context
  2465. }
  2466. )
  2467. ),
  2468. ok.
  2469. t_sqlparse_case_when_1(_Config) ->
  2470. %% case-when-else clause
  2471. Sql =
  2472. "select "
  2473. " case when payload.x < 0 then 0 "
  2474. " when payload.x > 7 then 7 "
  2475. " else payload.x "
  2476. " end as y "
  2477. "from \"t/#\" ",
  2478. ?assertMatch(
  2479. {ok, #{<<"y">> := 1}},
  2480. emqx_rule_sqltester:test(
  2481. #{
  2482. sql => Sql,
  2483. context => #{
  2484. payload => <<"{\"x\": 1}">>,
  2485. topic => <<"t/a">>
  2486. }
  2487. }
  2488. )
  2489. ),
  2490. ?assertMatch(
  2491. {ok, #{<<"y">> := 0}},
  2492. emqx_rule_sqltester:test(
  2493. #{
  2494. sql => Sql,
  2495. context => #{
  2496. payload => <<"{\"x\": 0}">>,
  2497. topic => <<"t/a">>
  2498. }
  2499. }
  2500. )
  2501. ),
  2502. ?assertMatch(
  2503. {ok, #{<<"y">> := 0}},
  2504. emqx_rule_sqltester:test(
  2505. #{
  2506. sql => Sql,
  2507. context => #{
  2508. payload => <<"{\"x\": -1}">>,
  2509. topic => <<"t/a">>
  2510. }
  2511. }
  2512. )
  2513. ),
  2514. ?assertMatch(
  2515. {ok, #{<<"y">> := 7}},
  2516. emqx_rule_sqltester:test(
  2517. #{
  2518. sql => Sql,
  2519. context => #{
  2520. payload => <<"{\"x\": 7}">>,
  2521. topic => <<"t/a">>
  2522. }
  2523. }
  2524. )
  2525. ),
  2526. ?assertMatch(
  2527. {ok, #{<<"y">> := 7}},
  2528. emqx_rule_sqltester:test(
  2529. #{
  2530. sql => Sql,
  2531. context => #{
  2532. payload => <<"{\"x\": 8}">>,
  2533. topic => <<"t/a">>
  2534. }
  2535. }
  2536. )
  2537. ),
  2538. ok.
  2539. t_sqlparse_case_when_2(_Config) ->
  2540. % switch clause
  2541. Sql =
  2542. "select "
  2543. " case payload.x when 1 then 2 "
  2544. " when 2 then 3 "
  2545. " else 4 "
  2546. " end as y "
  2547. "from \"t/#\" ",
  2548. ?assertMatch(
  2549. {ok, #{<<"y">> := 2}},
  2550. emqx_rule_sqltester:test(
  2551. #{
  2552. sql => Sql,
  2553. context => #{
  2554. payload => <<"{\"x\": 1}">>,
  2555. topic => <<"t/a">>
  2556. }
  2557. }
  2558. )
  2559. ),
  2560. ?assertMatch(
  2561. {ok, #{<<"y">> := 3}},
  2562. emqx_rule_sqltester:test(
  2563. #{
  2564. sql => Sql,
  2565. context => #{
  2566. payload => <<"{\"x\": 2}">>,
  2567. topic => <<"t/a">>
  2568. }
  2569. }
  2570. )
  2571. ),
  2572. ?assertMatch(
  2573. {ok, #{<<"y">> := 4}},
  2574. emqx_rule_sqltester:test(
  2575. #{
  2576. sql => Sql,
  2577. context => #{
  2578. payload => <<"{\"x\": 4}">>,
  2579. topic => <<"t/a">>
  2580. }
  2581. }
  2582. )
  2583. ),
  2584. ?assertMatch(
  2585. {ok, #{<<"y">> := 4}},
  2586. emqx_rule_sqltester:test(
  2587. #{
  2588. sql => Sql,
  2589. context => #{
  2590. payload => <<"{\"x\": 7}">>,
  2591. topic => <<"t/a">>
  2592. }
  2593. }
  2594. )
  2595. ),
  2596. ?assertMatch(
  2597. {ok, #{<<"y">> := 4}},
  2598. emqx_rule_sqltester:test(
  2599. #{
  2600. sql => Sql,
  2601. context => #{
  2602. payload => <<"{\"x\": 8}">>,
  2603. topic => <<"t/a">>
  2604. }
  2605. }
  2606. )
  2607. ).
  2608. t_sqlparse_case_when_3(_Config) ->
  2609. %% case-when clause
  2610. Sql =
  2611. "select "
  2612. " case when payload.x < 0 then 0 "
  2613. " when payload.x > 7 then 7 "
  2614. " end as y "
  2615. "from \"t/#\" ",
  2616. ?assertMatch(
  2617. {ok, #{}},
  2618. emqx_rule_sqltester:test(
  2619. #{
  2620. sql => Sql,
  2621. context => #{
  2622. payload => <<"{\"x\": 1}">>,
  2623. topic => <<"t/a">>
  2624. }
  2625. }
  2626. )
  2627. ),
  2628. ?assertMatch(
  2629. {ok, #{}},
  2630. emqx_rule_sqltester:test(
  2631. #{
  2632. sql => Sql,
  2633. context => #{
  2634. payload => <<"{\"x\": 5}">>,
  2635. topic => <<"t/a">>
  2636. }
  2637. }
  2638. )
  2639. ),
  2640. ?assertMatch(
  2641. {ok, #{}},
  2642. emqx_rule_sqltester:test(
  2643. #{
  2644. sql => Sql,
  2645. context => #{
  2646. payload => <<"{\"x\": 0}">>,
  2647. topic => <<"t/a">>
  2648. }
  2649. }
  2650. )
  2651. ),
  2652. ?assertMatch(
  2653. {ok, #{<<"y">> := 0}},
  2654. emqx_rule_sqltester:test(
  2655. #{
  2656. sql => Sql,
  2657. context => #{
  2658. payload => <<"{\"x\": -1}">>,
  2659. topic => <<"t/a">>
  2660. }
  2661. }
  2662. )
  2663. ),
  2664. ?assertMatch(
  2665. {ok, #{}},
  2666. emqx_rule_sqltester:test(
  2667. #{
  2668. sql => Sql,
  2669. context => #{
  2670. payload => <<"{\"x\": 7}">>,
  2671. topic => <<"t/a">>
  2672. }
  2673. }
  2674. )
  2675. ),
  2676. ?assertMatch(
  2677. {ok, #{<<"y">> := 7}},
  2678. emqx_rule_sqltester:test(
  2679. #{
  2680. sql => Sql,
  2681. context => #{
  2682. payload => <<"{\"x\": 8}">>,
  2683. topic => <<"t/a">>
  2684. }
  2685. }
  2686. )
  2687. ),
  2688. ok.
  2689. t_sqlparse_array_index_1(_Config) ->
  2690. %% index get
  2691. Sql =
  2692. "select "
  2693. " json_decode(payload) as p, "
  2694. " p[1] as a "
  2695. "from \"t/#\" ",
  2696. ?assertMatch(
  2697. {ok, #{<<"a">> := #{<<"x">> := 1}}},
  2698. emqx_rule_sqltester:test(
  2699. #{
  2700. sql => Sql,
  2701. context => #{
  2702. payload => <<"[{\"x\": 1}]">>,
  2703. topic => <<"t/a">>
  2704. }
  2705. }
  2706. )
  2707. ),
  2708. ?assertMatch(
  2709. {ok, #{}},
  2710. emqx_rule_sqltester:test(
  2711. #{
  2712. sql => Sql,
  2713. context => #{
  2714. payload => <<"{\"x\": 1}">>,
  2715. topic => <<"t/a">>
  2716. }
  2717. }
  2718. )
  2719. ),
  2720. %% index get without 'as'
  2721. Sql2 =
  2722. "select "
  2723. " payload.x[2] "
  2724. "from \"t/#\" ",
  2725. ?assertMatch(
  2726. {ok, #{payload := #{<<"x">> := [3]}}},
  2727. emqx_rule_sqltester:test(
  2728. #{
  2729. sql => Sql2,
  2730. context => #{
  2731. payload => #{<<"x">> => [1, 3, 4]},
  2732. topic => <<"t/a">>
  2733. }
  2734. }
  2735. )
  2736. ),
  2737. %% index get without 'as' again
  2738. Sql3 =
  2739. "select "
  2740. " payload.x[2].y "
  2741. "from \"t/#\" ",
  2742. ?assertMatch(
  2743. {ok, #{payload := #{<<"x">> := [#{<<"y">> := 3}]}}},
  2744. emqx_rule_sqltester:test(
  2745. #{
  2746. sql => Sql3,
  2747. context => #{
  2748. payload => #{<<"x">> => [1, #{y => 3}, 4]},
  2749. topic => <<"t/a">>
  2750. }
  2751. }
  2752. )
  2753. ),
  2754. %% index get with 'as'
  2755. Sql4 =
  2756. "select "
  2757. " payload.x[2].y as b "
  2758. "from \"t/#\" ",
  2759. ?assertMatch(
  2760. {ok, #{<<"b">> := 3}},
  2761. emqx_rule_sqltester:test(
  2762. #{
  2763. sql => Sql4,
  2764. context => #{
  2765. payload => #{<<"x">> => [1, #{y => 3}, 4]},
  2766. topic => <<"t/a">>
  2767. }
  2768. }
  2769. )
  2770. ).
  2771. t_sqlparse_array_index_2(_Config) ->
  2772. %% array get with negative index
  2773. Sql1 =
  2774. "select "
  2775. " payload.x[-2].y as b "
  2776. "from \"t/#\" ",
  2777. ?assertMatch(
  2778. {ok, #{<<"b">> := 3}},
  2779. emqx_rule_sqltester:test(
  2780. #{
  2781. sql => Sql1,
  2782. context => #{
  2783. payload => #{<<"x">> => [1, #{y => 3}, 4]},
  2784. topic => <<"t/a">>
  2785. }
  2786. }
  2787. )
  2788. ),
  2789. %% array append to head or tail of a list:
  2790. Sql2 =
  2791. "select "
  2792. " payload.x as b, "
  2793. " 1 as c[-0], "
  2794. " 2 as c[-0], "
  2795. " b as c[0] "
  2796. "from \"t/#\" ",
  2797. ?assertMatch(
  2798. {ok, #{<<"b">> := 0, <<"c">> := [0, 1, 2]}},
  2799. emqx_rule_sqltester:test(
  2800. #{
  2801. sql => Sql2,
  2802. context => #{
  2803. payload => #{<<"x">> => 0},
  2804. topic => <<"t/a">>
  2805. }
  2806. }
  2807. )
  2808. ),
  2809. %% construct an empty list:
  2810. Sql3 =
  2811. "select "
  2812. " [] as c, "
  2813. " 1 as c[-0], "
  2814. " 2 as c[-0], "
  2815. " 0 as c[0] "
  2816. "from \"t/#\" ",
  2817. ?assertMatch(
  2818. {ok, #{<<"c">> := [0, 1, 2]}},
  2819. emqx_rule_sqltester:test(
  2820. #{
  2821. sql => Sql3,
  2822. context => #{
  2823. payload => <<"">>,
  2824. topic => <<"t/a">>
  2825. }
  2826. }
  2827. )
  2828. ),
  2829. %% construct a list:
  2830. Sql4 =
  2831. "select "
  2832. " [payload.a, \"topic\", 'c'] as c, "
  2833. " 1 as c[-0], "
  2834. " 2 as c[-0], "
  2835. " 0 as c[0] "
  2836. "from \"t/#\" ",
  2837. ?assertMatch(
  2838. {ok, #{<<"c">> := [0, 11, <<"t/a">>, <<"c">>, 1, 2]}},
  2839. emqx_rule_sqltester:test(
  2840. #{
  2841. sql => Sql4,
  2842. context => #{
  2843. payload => <<"{\"a\":11}">>,
  2844. topic => <<"t/a">>
  2845. }
  2846. }
  2847. )
  2848. ).
  2849. t_sqlparse_array_index_3(_Config) ->
  2850. %% array with json string payload:
  2851. Sql0 =
  2852. "select "
  2853. "payload,"
  2854. "payload.x[2].y "
  2855. "from \"t/#\" ",
  2856. ?assertMatch(
  2857. {ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := [1, 2]}, 3]}}},
  2858. emqx_rule_sqltester:test(
  2859. #{
  2860. sql => Sql0,
  2861. context => #{
  2862. payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
  2863. topic => <<"t/a">>
  2864. }
  2865. }
  2866. )
  2867. ),
  2868. %% same as above but don't select payload:
  2869. Sql1 =
  2870. "select "
  2871. "payload.x[2].y as b "
  2872. "from \"t/#\" ",
  2873. ?assertMatch(
  2874. {ok, #{<<"b">> := [1, 2]}},
  2875. emqx_rule_sqltester:test(
  2876. #{
  2877. sql => Sql1,
  2878. context => #{
  2879. payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
  2880. topic => <<"t/a">>
  2881. }
  2882. }
  2883. )
  2884. ),
  2885. %% same as above but add 'as' clause:
  2886. Sql2 =
  2887. "select "
  2888. "payload.x[2].y as b.c "
  2889. "from \"t/#\" ",
  2890. ?assertMatch(
  2891. {ok, #{<<"b">> := #{<<"c">> := [1, 2]}}},
  2892. emqx_rule_sqltester:test(
  2893. #{
  2894. sql => Sql2,
  2895. context => #{
  2896. payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
  2897. topic => <<"t/a">>
  2898. }
  2899. }
  2900. )
  2901. ).
  2902. t_sqlparse_array_index_4(_Config) ->
  2903. %% array with json string payload:
  2904. Sql0 =
  2905. "select "
  2906. "0 as payload.x[2].y "
  2907. "from \"t/#\" ",
  2908. ?assertMatch(
  2909. {ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 0}]}}},
  2910. emqx_rule_sqltester:test(
  2911. #{
  2912. sql => Sql0,
  2913. context => #{
  2914. payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
  2915. topic => <<"t/a">>
  2916. }
  2917. }
  2918. )
  2919. ),
  2920. %% array with json string payload, and also select payload.x:
  2921. Sql1 =
  2922. "select "
  2923. "payload.x, "
  2924. "0 as payload.x[2].y "
  2925. "from \"t/#\" ",
  2926. ?assertMatch(
  2927. {ok, #{payload := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}},
  2928. emqx_rule_sqltester:test(
  2929. #{
  2930. sql => Sql1,
  2931. context => #{
  2932. payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
  2933. topic => <<"t/a">>
  2934. }
  2935. }
  2936. )
  2937. ).
  2938. t_sqlparse_array_index_5(_Config) ->
  2939. Sql00 =
  2940. "select "
  2941. " [1,2,3,4] "
  2942. "from \"t/#\" ",
  2943. {ok, Res00} =
  2944. emqx_rule_sqltester:test(
  2945. #{
  2946. sql => Sql00,
  2947. context => #{
  2948. payload => <<"">>,
  2949. topic => <<"t/a">>
  2950. }
  2951. }
  2952. ),
  2953. ?assert(
  2954. lists:any(
  2955. fun({_K, V}) ->
  2956. V =:= [1, 2, 3, 4]
  2957. end,
  2958. maps:to_list(Res00)
  2959. )
  2960. ).
  2961. t_sqlparse_array_with_expressions(_Config) ->
  2962. Sql =
  2963. "select "
  2964. " [21 + 21, abs(-abs(-2)), [1 + 1], 4] "
  2965. "from \"t/#\" ",
  2966. {ok, Res} =
  2967. emqx_rule_sqltester:test(
  2968. #{
  2969. sql => Sql,
  2970. context => #{
  2971. payload => <<"">>,
  2972. topic => <<"t/a">>
  2973. }
  2974. }
  2975. ),
  2976. ?assert(
  2977. lists:any(
  2978. fun({_K, V}) ->
  2979. V =:= [42, 2, [2], 4]
  2980. end,
  2981. maps:to_list(Res)
  2982. )
  2983. ).
  2984. t_sqlparse_select_matadata_1(_Config) ->
  2985. %% array with json string payload:
  2986. Sql0 =
  2987. "select "
  2988. "payload "
  2989. "from \"t/#\" ",
  2990. ?assertNotMatch(
  2991. {ok, #{<<"payload">> := <<"abc">>, metadata := _}},
  2992. emqx_rule_sqltester:test(
  2993. #{
  2994. sql => Sql0,
  2995. context => #{
  2996. payload => <<"abc">>,
  2997. topic => <<"t/a">>
  2998. }
  2999. }
  3000. )
  3001. ),
  3002. Sql1 =
  3003. "select "
  3004. "payload, metadata "
  3005. "from \"t/#\" ",
  3006. ?assertMatch(
  3007. {ok, #{<<"payload">> := <<"abc">>, <<"metadata">> := _}},
  3008. emqx_rule_sqltester:test(
  3009. #{
  3010. sql => Sql1,
  3011. context => #{
  3012. payload => <<"abc">>,
  3013. topic => <<"t/a">>
  3014. }
  3015. }
  3016. )
  3017. ).
  3018. t_sqlparse_array_range_1(_Config) ->
  3019. %% get a range of list
  3020. Sql0 =
  3021. "select "
  3022. " payload.a[1..4] as c "
  3023. "from \"t/#\" ",
  3024. ?assertMatch(
  3025. {ok, #{<<"c">> := [0, 1, 2, 3]}},
  3026. emqx_rule_sqltester:test(
  3027. #{
  3028. sql => Sql0,
  3029. context => #{
  3030. payload => <<"{\"a\":[0,1,2,3,4,5]}">>,
  3031. topic => <<"t/a">>
  3032. }
  3033. }
  3034. )
  3035. ),
  3036. %% get a range from non-list data
  3037. Sql02 =
  3038. "select "
  3039. " payload.a[1..4] as c "
  3040. "from \"t/#\" ",
  3041. ?assertMatch(
  3042. {error, {select_and_transform_error, {error, {range_get, non_list_data}, _}}},
  3043. emqx_rule_sqltester:test(
  3044. #{
  3045. sql => Sql02,
  3046. context =>
  3047. #{
  3048. payload => <<"{\"x\":[0,1,2,3,4,5]}">>,
  3049. topic => <<"t/a">>
  3050. }
  3051. }
  3052. )
  3053. ),
  3054. %% construct a range:
  3055. Sql1 =
  3056. "select "
  3057. " [1..4] as c, "
  3058. " 5 as c[-0], "
  3059. " 6 as c[-0], "
  3060. " 0 as c[0] "
  3061. "from \"t/#\" ",
  3062. ?assertMatch(
  3063. {ok, #{<<"c">> := [0, 1, 2, 3, 4, 5, 6]}},
  3064. emqx_rule_sqltester:test(
  3065. #{
  3066. sql => Sql1,
  3067. context => #{
  3068. payload => <<"">>,
  3069. topic => <<"t/a">>
  3070. }
  3071. }
  3072. )
  3073. ).
  3074. t_sqlparse_array_range_2(_Config) ->
  3075. %% construct a range without 'as'
  3076. Sql00 =
  3077. "select "
  3078. " [1..4] "
  3079. "from \"t/#\" ",
  3080. {ok, Res00} =
  3081. emqx_rule_sqltester:test(
  3082. #{
  3083. sql => Sql00,
  3084. context => #{
  3085. payload => <<"">>,
  3086. topic => <<"t/a">>
  3087. }
  3088. }
  3089. ),
  3090. ?assert(
  3091. lists:any(
  3092. fun({_K, V}) ->
  3093. V =:= [1, 2, 3, 4]
  3094. end,
  3095. maps:to_list(Res00)
  3096. )
  3097. ),
  3098. %% construct a range without 'as'
  3099. Sql01 =
  3100. "select "
  3101. " a[2..4] "
  3102. "from \"t/#\" ",
  3103. ?assertMatch(
  3104. {ok, #{<<"a">> := [2, 3, 4]}},
  3105. emqx_rule_sqltester:test(
  3106. #{
  3107. sql => Sql01,
  3108. context => #{
  3109. <<"a">> => [1, 2, 3, 4, 5],
  3110. topic => <<"t/a">>
  3111. }
  3112. }
  3113. )
  3114. ),
  3115. %% get a range of list without 'as'
  3116. Sql02 =
  3117. "select "
  3118. " payload.a[1..4] "
  3119. "from \"t/#\" ",
  3120. ?assertMatch(
  3121. {ok, #{payload := #{<<"a">> := [0, 1, 2, 3]}}},
  3122. emqx_rule_sqltester:test(
  3123. #{
  3124. sql => Sql02,
  3125. context => #{
  3126. payload => <<"{\"a\":[0,1,2,3,4,5]}">>,
  3127. topic => <<"t/a">>
  3128. }
  3129. }
  3130. )
  3131. ).
  3132. t_sqlparse_true_false(_Config) ->
  3133. %% construct a range without 'as'
  3134. Sql00 =
  3135. "select "
  3136. " true as a, false as b, "
  3137. " false as x.y, true as c[-0] "
  3138. "from \"t/#\" ",
  3139. {ok, Res00} =
  3140. emqx_rule_sqltester:test(
  3141. #{
  3142. sql => Sql00,
  3143. context => #{
  3144. payload => <<"">>,
  3145. topic => <<"t/a">>
  3146. }
  3147. }
  3148. ),
  3149. ?assertMatch(
  3150. #{
  3151. <<"a">> := true,
  3152. <<"b">> := false,
  3153. <<"x">> := #{<<"y">> := false},
  3154. <<"c">> := [true]
  3155. },
  3156. Res00
  3157. ).
  3158. t_sqlparse_undefined_variable(_Config) ->
  3159. %% undefined == undefined
  3160. Sql00 =
  3161. "select "
  3162. "a, b "
  3163. "from \"t/#\" "
  3164. "where a = b",
  3165. {ok, Res00} = emqx_rule_sqltester:test(
  3166. #{sql => Sql00, context => #{payload => <<"">>, topic => <<"t/a">>}}
  3167. ),
  3168. ?assertEqual(#{<<"a">> => undefined, <<"b">> => undefined}, Res00),
  3169. ?assertEqual(2, map_size(Res00)),
  3170. %% undefined compare to non-undefined variables should return false
  3171. Sql01 =
  3172. "select "
  3173. "a, b "
  3174. "from \"t/#\" "
  3175. "where a > b",
  3176. {error, nomatch} = emqx_rule_sqltester:test(
  3177. #{
  3178. sql => Sql01,
  3179. context => #{payload => <<"{\"b\":1}">>, topic => <<"t/a">>}
  3180. }
  3181. ),
  3182. Sql02 =
  3183. "select "
  3184. "a < b as c "
  3185. "from \"t/#\" ",
  3186. {ok, Res02} = emqx_rule_sqltester:test(
  3187. #{
  3188. sql => Sql02,
  3189. context => #{payload => <<"{\"b\":1}">>, topic => <<"t/a">>}
  3190. }
  3191. ),
  3192. ?assertMatch(#{<<"c">> := false}, Res02).
  3193. t_sqlparse_new_map(_Config) ->
  3194. %% construct a range without 'as'
  3195. Sql00 =
  3196. "select "
  3197. " map_new() as a, map_new() as b, "
  3198. " map_new() as x.y, map_new() as c[-0] "
  3199. "from \"t/#\" ",
  3200. {ok, Res00} =
  3201. emqx_rule_sqltester:test(
  3202. #{
  3203. sql => Sql00,
  3204. context => #{
  3205. payload => <<"">>,
  3206. topic => <<"t/a">>
  3207. }
  3208. }
  3209. ),
  3210. ?assertMatch(
  3211. #{
  3212. <<"a">> := #{},
  3213. <<"b">> := #{},
  3214. <<"x">> := #{<<"y">> := #{}},
  3215. <<"c">> := [#{}]
  3216. },
  3217. Res00
  3218. ).
  3219. t_sqlparse_payload_as(_Config) ->
  3220. %% https://github.com/emqx/emqx/issues/3866
  3221. Sql00 =
  3222. "SELECT "
  3223. " payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, "
  3224. " map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem "
  3225. "FROM \"t/#\" ",
  3226. Payload1 =
  3227. <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>,
  3228. {ok, Res01} = emqx_rule_sqltester:test(
  3229. #{
  3230. sql => Sql00,
  3231. context => #{
  3232. payload => Payload1,
  3233. topic => <<"t/a">>
  3234. }
  3235. }
  3236. ),
  3237. ?assertMatch(
  3238. #{
  3239. <<"payload">> := #{
  3240. <<"params">> := #{
  3241. <<"convertTemp">> := 20,
  3242. <<"engineSpeed">> := 42,
  3243. <<"engineWorkTime">> := -1,
  3244. <<"hydOilTem">> := 30
  3245. }
  3246. }
  3247. },
  3248. Res01
  3249. ),
  3250. Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>,
  3251. {ok, Res02} = emqx_rule_sqltester:test(
  3252. #{
  3253. sql => Sql00,
  3254. context => #{
  3255. payload => Payload2,
  3256. topic => <<"t/a">>
  3257. }
  3258. }
  3259. ),
  3260. ?assertMatch(
  3261. #{
  3262. <<"payload">> := #{
  3263. <<"params">> := #{
  3264. <<"convertTemp">> := 20,
  3265. <<"engineSpeed">> := 42,
  3266. <<"engineWorkTime">> := -1,
  3267. <<"hydOilTem">> := -1
  3268. }
  3269. }
  3270. },
  3271. Res02
  3272. ).
  3273. t_sqlparse_nested_get(_Config) ->
  3274. Sql =
  3275. "select payload as p, p.a.b as c "
  3276. "from \"t/#\" ",
  3277. ?assertMatch(
  3278. {ok, #{<<"c">> := 0}},
  3279. emqx_rule_sqltester:test(
  3280. #{
  3281. sql => Sql,
  3282. context => #{
  3283. topic => <<"t/1">>,
  3284. payload => <<"{\"a\": {\"b\": 0}}">>
  3285. }
  3286. }
  3287. )
  3288. ).
  3289. t_sqlparse_invalid_json(_Config) ->
  3290. Sql02 =
  3291. "select "
  3292. " payload.a[1..4] as c "
  3293. "from \"t/#\" ",
  3294. ?assertMatch(
  3295. {error, {select_and_transform_error, {error, {decode_json_failed, _}, _}}},
  3296. emqx_rule_sqltester:test(
  3297. #{
  3298. sql => Sql02,
  3299. context =>
  3300. #{
  3301. payload => <<"{\"x\":[0,1,2,3,}">>,
  3302. topic => <<"t/a">>
  3303. }
  3304. }
  3305. )
  3306. ),
  3307. Sql2 =
  3308. "foreach payload.sensors "
  3309. "do item.cmd as msg_type "
  3310. "from \"t/#\" ",
  3311. ?assertMatch(
  3312. {error, {select_and_collect_error, {error, {decode_json_failed, _}, _}}},
  3313. emqx_rule_sqltester:test(
  3314. #{
  3315. sql => Sql2,
  3316. context =>
  3317. #{
  3318. payload =>
  3319. <<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>,
  3320. topic => <<"t/a">>
  3321. }
  3322. }
  3323. )
  3324. ).
  3325. t_sqlparse_both_string_types_in_from(_Config) ->
  3326. %% Here is an SQL select statement with both string types in the FROM clause
  3327. SqlSelect =
  3328. "select clientid, topic as tp "
  3329. "from 't/tt', \"$events/client_connected\" ",
  3330. ?assertMatch(
  3331. {ok, #{<<"clientid">> := <<"abc">>, <<"tp">> := <<"t/tt">>}},
  3332. emqx_rule_sqltester:test(
  3333. #{
  3334. sql => SqlSelect,
  3335. context => #{clientid => <<"abc">>, topic => <<"t/tt">>}
  3336. }
  3337. )
  3338. ),
  3339. %% Here is an SQL foreach statement with both string types in the FROM clause
  3340. SqlForeach =
  3341. "foreach payload.sensors "
  3342. "from 't/#', \"$events/client_connected\" ",
  3343. ?assertMatch(
  3344. {ok, []},
  3345. emqx_rule_sqltester:test(
  3346. #{
  3347. sql => SqlForeach,
  3348. context =>
  3349. #{
  3350. payload => <<"{\"sensors\": 1}">>,
  3351. topic => <<"t/a">>
  3352. }
  3353. }
  3354. )
  3355. ).
  3356. %%------------------------------------------------------------------------------
  3357. %% Test cases for telemetry functions
  3358. %%------------------------------------------------------------------------------
  3359. t_get_basic_usage_info_0(_Config) ->
  3360. ?assertEqual(
  3361. #{
  3362. num_rules => 0,
  3363. referenced_bridges => #{}
  3364. },
  3365. emqx_rule_engine:get_basic_usage_info()
  3366. ),
  3367. ok.
  3368. t_get_basic_usage_info_1(_Config) ->
  3369. {ok, _} =
  3370. emqx_rule_engine:create_rule(
  3371. #{
  3372. id => <<"rule:t_get_basic_usage_info:1">>,
  3373. sql => <<"select 1 from topic">>,
  3374. actions =>
  3375. [
  3376. #{function => <<"erlang:hibernate">>, args => #{}},
  3377. #{function => console},
  3378. <<"webhook:my_webhook">>,
  3379. <<"webhook:my_webhook">>
  3380. ]
  3381. }
  3382. ),
  3383. {ok, _} =
  3384. emqx_rule_engine:create_rule(
  3385. #{
  3386. id => <<"rule:t_get_basic_usage_info:2">>,
  3387. sql => <<"select 1 from topic">>,
  3388. actions =>
  3389. [
  3390. <<"mqtt:my_mqtt_bridge">>,
  3391. <<"webhook:my_webhook">>
  3392. ]
  3393. }
  3394. ),
  3395. ?assertEqual(
  3396. #{
  3397. num_rules => 2,
  3398. referenced_bridges =>
  3399. #{
  3400. mqtt => 1,
  3401. http => 3
  3402. }
  3403. },
  3404. emqx_rule_engine:get_basic_usage_info()
  3405. ),
  3406. ok.
  3407. t_get_rule_ids_by_action_reference_ingress_bridge(_Config) ->
  3408. BridgeId = <<"mqtt:ingress">>,
  3409. RuleId = <<"rule:ingress_bridge_referenced">>,
  3410. {ok, _} =
  3411. emqx_rule_engine:create_rule(
  3412. #{
  3413. id => RuleId,
  3414. sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>,
  3415. actions => [#{function => console}]
  3416. }
  3417. ),
  3418. on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
  3419. ?assertMatch(
  3420. [RuleId],
  3421. emqx_rule_engine:get_rule_ids_by_action(BridgeId)
  3422. ),
  3423. ok.
  3424. %%------------------------------------------------------------------------------
  3425. %% Test cases for rule metrics
  3426. %%------------------------------------------------------------------------------
  3427. -define(BRIDGE_TYPE, <<"mqtt">>).
  3428. -define(BRIDGE_NAME, <<"bridge_over_troubled_water">>).
  3429. -define(BRIDGE_CONFIG(QMODE), #{
  3430. <<"server">> => <<"127.0.0.1:1883">>,
  3431. <<"username">> => <<"user1">>,
  3432. <<"password">> => <<"">>,
  3433. <<"proto_ver">> => <<"v4">>,
  3434. <<"ssl">> => #{<<"enable">> => false},
  3435. <<"egress">> =>
  3436. #{
  3437. <<"local">> =>
  3438. #{
  3439. <<"topic">> => <<"foo/#">>
  3440. },
  3441. <<"remote">> =>
  3442. #{
  3443. <<"topic">> => <<"bar/${topic}">>,
  3444. <<"payload">> => <<"${payload}">>,
  3445. <<"qos">> => <<"${qos}">>,
  3446. <<"retain">> => <<"${retain}">>
  3447. }
  3448. },
  3449. <<"resource_opts">> =>
  3450. #{
  3451. <<"health_check_interval">> => <<"5s">>,
  3452. <<"query_mode">> => QMODE,
  3453. <<"request_ttl">> => <<"3s">>,
  3454. <<"worker_pool_size">> => 1
  3455. }
  3456. }).
  3457. -define(SUCCESSS_METRICS, #{
  3458. matched := 1,
  3459. 'actions.total' := 1,
  3460. 'actions.failed' := 0,
  3461. 'actions.success' := 1
  3462. }).
  3463. -define(FAIL_METRICS, #{
  3464. matched := 1,
  3465. 'actions.total' := 1,
  3466. 'actions.failed' := 1,
  3467. 'actions.success' := 0
  3468. }).
  3469. t_rule_metrics_sync(_Config) ->
  3470. do_test_rule_metrics_success(<<"sync">>).
  3471. t_rule_metrics_async(_Config) ->
  3472. do_test_rule_metrics_success(<<"async">>).
  3473. t_rule_metrics_sync_fail(_Config) ->
  3474. do_test_rule_metrics_fail(<<"sync">>).
  3475. t_rule_metrics_async_fail(_Config) ->
  3476. do_test_rule_metrics_fail(<<"async">>).
  3477. do_test_rule_metrics_success(QMode) ->
  3478. ?assertMatch(
  3479. ?SUCCESSS_METRICS,
  3480. do_test_rule_metrics(QMode)
  3481. ).
  3482. do_test_rule_metrics_fail(QMode) ->
  3483. ?assertMatch(
  3484. ?FAIL_METRICS,
  3485. do_test_rule_metrics(QMode)
  3486. ).
  3487. do_test_rule_metrics(QMode) ->
  3488. BridgeId = create_bridge(?BRIDGE_TYPE, ?BRIDGE_NAME, ?BRIDGE_CONFIG(QMode)),
  3489. RuleId = <<"rule:test_metrics_bridge_action">>,
  3490. {ok, #{id := RuleId}} =
  3491. emqx_rule_engine:create_rule(
  3492. #{
  3493. id => RuleId,
  3494. sql => <<"SELECT * FROM \"topic/#\"">>,
  3495. actions => [BridgeId]
  3496. }
  3497. ),
  3498. timer:sleep(100),
  3499. ?assertMatch(
  3500. #{
  3501. matched := 0,
  3502. 'actions.total' := 0,
  3503. 'actions.failed' := 0,
  3504. 'actions.success' := 0
  3505. },
  3506. emqx_metrics_worker:get_counters(rule_metrics, RuleId)
  3507. ),
  3508. MsgId = emqx_guid:gen(),
  3509. emqx:publish(#message{id = MsgId, topic = <<"topic/test">>, payload = <<"hello">>}),
  3510. timer:sleep(100),
  3511. on_exit(
  3512. fun() ->
  3513. emqx_rule_engine:delete_rule(RuleId),
  3514. emqx_bridge:remove(?BRIDGE_TYPE, ?BRIDGE_NAME)
  3515. end
  3516. ),
  3517. emqx_metrics_worker:get_counters(rule_metrics, RuleId).
  3518. create_bridge(Type, Name, Config) ->
  3519. {ok, _Bridge} = emqx_bridge:create(Type, Name, Config),
  3520. emqx_bridge_resource:bridge_id(Type, Name).
  3521. create_rule(Name, SQL) ->
  3522. Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL),
  3523. {ok, _} = emqx_rule_engine:create_rule(Rule).
  3524. emqtt_client_config() ->
  3525. [
  3526. {host, "localhost"},
  3527. {clientid, <<"client">>},
  3528. {username, <<"testuser">>},
  3529. {password, <<"pass">>}
  3530. ].
  3531. filesync(Name, Type) ->
  3532. ct:sleep(50),
  3533. filesync(Name, Type, 5).
  3534. %% sometime the handler process is not started yet.
  3535. filesync(Name, Type, 0) ->
  3536. ct:fail("Handler process not started ~p ~p", [Name, Type]);
  3537. filesync(Name0, Type, Retry) ->
  3538. Name =
  3539. case is_binary(Name0) of
  3540. true -> Name0;
  3541. false -> list_to_binary(Name0)
  3542. end,
  3543. try
  3544. Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>),
  3545. ok = logger_disk_log_h:filesync(Handler)
  3546. catch
  3547. E:R ->
  3548. ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
  3549. ct:sleep(100),
  3550. filesync(Name, Type, Retry - 1)
  3551. end.
  3552. t_trace_rule_id(_Config) ->
  3553. %% Start MQTT Client
  3554. emqx_trace_SUITE:reload(),
  3555. {ok, T} = emqtt:start_link(emqtt_client_config()),
  3556. emqtt:connect(T),
  3557. %% Create rules
  3558. create_rule(
  3559. <<"test_rule_id_1">>,
  3560. <<"select 1 as rule_number from \"rule_1_topic\"">>
  3561. ),
  3562. create_rule(
  3563. <<"test_rule_id_2">>,
  3564. <<"select 2 as rule_number from \"rule_2_topic\"">>
  3565. ),
  3566. %% Start tracing
  3567. ok = emqx_trace_handler:install(
  3568. "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log"
  3569. ),
  3570. ok = emqx_trace_handler:install(
  3571. "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log"
  3572. ),
  3573. emqx_trace:check(),
  3574. ok = filesync("CLI-RULE-1", ruleid),
  3575. ok = filesync("CLI-RULE-2", ruleid),
  3576. %% Verify the tracing file exits
  3577. ?assert(filelib:is_regular("tmp/rule_trace_1.log")),
  3578. ?assert(filelib:is_regular("tmp/rule_trace_2.log")),
  3579. %% Get current traces
  3580. ?assertMatch(
  3581. [
  3582. #{
  3583. type := ruleid,
  3584. filter := <<"test_rule_id_1">>,
  3585. level := debug,
  3586. dst := "tmp/rule_trace_1.log",
  3587. name := <<"CLI-RULE-1">>
  3588. },
  3589. #{
  3590. type := ruleid,
  3591. filter := <<"test_rule_id_2">>,
  3592. name := <<"CLI-RULE-2">>,
  3593. level := debug,
  3594. dst := "tmp/rule_trace_2.log"
  3595. }
  3596. ],
  3597. emqx_trace_handler:running()
  3598. ),
  3599. %% Trigger rule
  3600. emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>),
  3601. ?retry(
  3602. 100,
  3603. 5,
  3604. begin
  3605. ok = filesync("CLI-RULE-1", ruleid),
  3606. {ok, Bin} = file:read_file("tmp/rule_trace_1.log"),
  3607. ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>]))
  3608. end
  3609. ),
  3610. ok = filesync("CLI-RULE-2", ruleid),
  3611. ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0),
  3612. %% Stop tracing
  3613. ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>),
  3614. ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>),
  3615. ?assertEqual([], emqx_trace_handler:running()),
  3616. emqtt:disconnect(T).
  3617. %%------------------------------------------------------------------------------
  3618. %% Internal helpers
  3619. %%------------------------------------------------------------------------------
  3620. republish_action(Topic) ->
  3621. republish_action(Topic, <<"${payload}">>).
  3622. republish_action(Topic, Payload) ->
  3623. republish_action(Topic, Payload, <<"${user_properties}">>).
  3624. republish_action(Topic, Payload, UserProperties) ->
  3625. republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
  3626. republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
  3627. #{
  3628. function => republish,
  3629. args => #{
  3630. payload => Payload,
  3631. topic => Topic,
  3632. qos => 0,
  3633. retain => false,
  3634. mqtt_properties => MQTTProperties,
  3635. user_properties => UserProperties
  3636. }
  3637. }.
  3638. action_response(Selected, Envs, Args) ->
  3639. ?tp(action_response, #{
  3640. selected => Selected,
  3641. envs => Envs,
  3642. args => Args
  3643. }),
  3644. ok.
  3645. make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
  3646. SQL = <<"select * from \"simple/topic\"">>,
  3647. make_simple_rule(RuleId, SQL, Ts).
  3648. make_simple_rule(RuleId) when is_binary(RuleId) ->
  3649. SQL = <<"select * from \"simple/topic\"">>,
  3650. make_simple_rule(RuleId, SQL).
  3651. make_simple_rule(RuleId, SQL) when is_binary(RuleId) ->
  3652. make_simple_rule(RuleId, SQL, erlang:system_time(millisecond)).
  3653. make_simple_rule(RuleId, SQL, Ts) when is_binary(RuleId) ->
  3654. #{
  3655. id => RuleId,
  3656. sql => SQL,
  3657. actions => [#{function => console, args => #{}}],
  3658. description => <<"simple rule">>,
  3659. created_at => Ts
  3660. }.
  3661. action_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
  3662. ct:pal("applying action_record_triggered_events: ~p", [Data]),
  3663. ets:insert(events_record_tab, {EventName, Data}).
  3664. verify_event(EventName) ->
  3665. ct:sleep(50),
  3666. case ets:lookup(events_record_tab, EventName) of
  3667. [] ->
  3668. ct:fail({no_such_event, EventName, ets:tab2list(events_record_tab)});
  3669. Records ->
  3670. [
  3671. begin
  3672. %% verify fields can be formatted to JSON string
  3673. _ = emqx_utils_json:encode(Fields),
  3674. %% verify metadata fields
  3675. verify_metadata_fields(EventName, Fields),
  3676. %% verify available fields for each event name
  3677. verify_event_fields(EventName, Fields)
  3678. end
  3679. || {_Name, Fields} <- Records
  3680. ]
  3681. end.
  3682. verify_metadata_fields(_EventName, #{metadata := Metadata}) ->
  3683. ?assertMatch(
  3684. #{rule_id := <<"rule:t_events">>},
  3685. Metadata
  3686. ).
  3687. verify_event_fields('message.publish', Fields) ->
  3688. #{
  3689. id := ID,
  3690. clientid := ClientId,
  3691. username := Username,
  3692. payload := Payload,
  3693. peerhost := PeerHost,
  3694. topic := Topic,
  3695. qos := QoS,
  3696. flags := Flags,
  3697. pub_props := Properties,
  3698. timestamp := Timestamp,
  3699. publish_received_at := EventAt
  3700. } = Fields,
  3701. Now = erlang:system_time(millisecond),
  3702. TimestampElapse = Now - Timestamp,
  3703. RcvdAtElapse = Now - EventAt,
  3704. ?assert(is_binary(ID)),
  3705. ?assertEqual(<<"c_event">>, ClientId),
  3706. ?assertEqual(<<"u_event">>, Username),
  3707. ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
  3708. verify_ipaddr(PeerHost),
  3709. ?assertEqual(<<"t1">>, Topic),
  3710. ?assertEqual(1, QoS),
  3711. ?assert(is_map(Flags)),
  3712. ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
  3713. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3714. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3715. ?assert(EventAt =< Timestamp);
  3716. verify_event_fields('client.connected', Fields) ->
  3717. #{
  3718. clientid := ClientId,
  3719. username := Username,
  3720. mountpoint := MountPoint,
  3721. peername := PeerName,
  3722. sockname := SockName,
  3723. proto_name := ProtoName,
  3724. proto_ver := ProtoVer,
  3725. keepalive := Keepalive,
  3726. clean_start := CleanStart,
  3727. expiry_interval := ExpiryInterval,
  3728. is_bridge := IsBridge,
  3729. conn_props := Properties,
  3730. timestamp := Timestamp,
  3731. connected_at := EventAt
  3732. } = Fields,
  3733. Now = erlang:system_time(millisecond),
  3734. TimestampElapse = Now - Timestamp,
  3735. RcvdAtElapse = Now - EventAt,
  3736. ?assert(is_binary(MountPoint) orelse MountPoint == undefined),
  3737. ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
  3738. ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
  3739. verify_peername(PeerName),
  3740. verify_peername(SockName),
  3741. ?assertEqual(<<"MQTT">>, ProtoName),
  3742. ?assertEqual(5, ProtoVer),
  3743. ?assert(is_integer(Keepalive)),
  3744. ?assert(is_boolean(CleanStart)),
  3745. ?assertEqual(60, ExpiryInterval),
  3746. ?assertEqual(false, IsBridge),
  3747. ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
  3748. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3749. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3750. ?assert(EventAt =< Timestamp);
  3751. verify_event_fields('client.disconnected', Fields) ->
  3752. #{
  3753. reason := Reason,
  3754. clientid := ClientId,
  3755. username := Username,
  3756. peername := PeerName,
  3757. sockname := SockName,
  3758. disconn_props := Properties,
  3759. timestamp := Timestamp,
  3760. disconnected_at := EventAt
  3761. } = Fields,
  3762. Now = erlang:system_time(millisecond),
  3763. TimestampElapse = Now - Timestamp,
  3764. RcvdAtElapse = Now - EventAt,
  3765. ?assert(is_atom(Reason)),
  3766. ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
  3767. ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
  3768. verify_peername(PeerName),
  3769. verify_peername(SockName),
  3770. ?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
  3771. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3772. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3773. ?assert(EventAt =< Timestamp);
  3774. verify_event_fields(SubUnsub, Fields) when
  3775. SubUnsub == 'session.subscribed';
  3776. SubUnsub == 'session.unsubscribed'
  3777. ->
  3778. #{
  3779. clientid := ClientId,
  3780. username := Username,
  3781. peerhost := PeerHost,
  3782. topic := Topic,
  3783. qos := QoS,
  3784. timestamp := Timestamp
  3785. } = Fields,
  3786. Now = erlang:system_time(millisecond),
  3787. TimestampElapse = Now - Timestamp,
  3788. ?assert(is_atom(reason)),
  3789. ?assertEqual(<<"c_event2">>, ClientId),
  3790. ?assertEqual(<<"u_event2">>, Username),
  3791. verify_ipaddr(PeerHost),
  3792. ?assertEqual(<<"t1">>, Topic),
  3793. ?assertEqual(1, QoS),
  3794. PropKey =
  3795. case SubUnsub of
  3796. 'session.subscribed' -> sub_props;
  3797. 'session.unsubscribed' -> unsub_props
  3798. end,
  3799. ?assertMatch(
  3800. #{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
  3801. maps:get(PropKey, Fields)
  3802. ),
  3803. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
  3804. verify_event_fields('delivery.dropped', Fields) ->
  3805. #{
  3806. event := 'delivery.dropped',
  3807. id := ID,
  3808. metadata := #{rule_id := RuleId},
  3809. reason := Reason,
  3810. clientid := ClientId,
  3811. username := Username,
  3812. from_clientid := FromClientId,
  3813. from_username := FromUsername,
  3814. node := Node,
  3815. payload := Payload,
  3816. peerhost := PeerHost,
  3817. pub_props := Properties,
  3818. publish_received_at := EventAt,
  3819. qos := QoS,
  3820. flags := Flags,
  3821. timestamp := Timestamp,
  3822. topic := Topic
  3823. } = Fields,
  3824. Now = erlang:system_time(millisecond),
  3825. TimestampElapse = Now - Timestamp,
  3826. RcvdAtElapse = Now - EventAt,
  3827. ?assert(is_binary(ID)),
  3828. ?assertEqual(<<"rule:t_events">>, RuleId),
  3829. ?assertEqual(no_local, Reason),
  3830. ?assertEqual(node(), Node),
  3831. ?assertEqual(<<"c_event">>, ClientId),
  3832. ?assertEqual(<<"u_event">>, Username),
  3833. ?assertEqual(<<"c_event">>, FromClientId),
  3834. ?assertEqual(<<"u_event">>, FromUsername),
  3835. ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
  3836. verify_ipaddr(PeerHost),
  3837. ?assertEqual(<<"t1">>, Topic),
  3838. ?assertEqual(1, QoS),
  3839. ?assert(is_map(Flags)),
  3840. ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
  3841. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3842. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3843. ?assert(EventAt =< Timestamp);
  3844. verify_event_fields('message.dropped', Fields) ->
  3845. #{
  3846. id := ID,
  3847. reason := Reason,
  3848. clientid := ClientId,
  3849. username := Username,
  3850. payload := Payload,
  3851. peerhost := PeerHost,
  3852. topic := Topic,
  3853. qos := QoS,
  3854. flags := Flags,
  3855. pub_props := Properties,
  3856. timestamp := Timestamp,
  3857. publish_received_at := EventAt
  3858. } = Fields,
  3859. Now = erlang:system_time(millisecond),
  3860. TimestampElapse = Now - Timestamp,
  3861. RcvdAtElapse = Now - EventAt,
  3862. ?assert(is_binary(ID)),
  3863. ?assert(is_atom(Reason)),
  3864. ?assertEqual(<<"c_event">>, ClientId),
  3865. ?assertEqual(<<"u_event">>, Username),
  3866. ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
  3867. verify_ipaddr(PeerHost),
  3868. ?assertEqual(<<"t1">>, Topic),
  3869. ?assertEqual(1, QoS),
  3870. ?assert(is_map(Flags)),
  3871. ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
  3872. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3873. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3874. ?assert(EventAt =< Timestamp);
  3875. verify_event_fields('message.delivered', Fields) ->
  3876. #{
  3877. id := ID,
  3878. clientid := ClientId,
  3879. username := Username,
  3880. from_clientid := FromClientId,
  3881. from_username := FromUsername,
  3882. payload := Payload,
  3883. peerhost := PeerHost,
  3884. topic := Topic,
  3885. qos := QoS,
  3886. flags := Flags,
  3887. pub_props := Properties,
  3888. timestamp := Timestamp,
  3889. publish_received_at := EventAt
  3890. } = Fields,
  3891. Now = erlang:system_time(millisecond),
  3892. TimestampElapse = Now - Timestamp,
  3893. RcvdAtElapse = Now - EventAt,
  3894. ?assert(is_binary(ID)),
  3895. ?assertEqual(<<"c_event2">>, ClientId),
  3896. ?assertEqual(<<"u_event2">>, Username),
  3897. ?assertEqual(<<"c_event">>, FromClientId),
  3898. ?assertEqual(<<"u_event">>, FromUsername),
  3899. ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
  3900. verify_ipaddr(PeerHost),
  3901. ?assertEqual(<<"t1">>, Topic),
  3902. ?assertEqual(1, QoS),
  3903. ?assert(is_map(Flags)),
  3904. ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
  3905. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3906. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3907. ?assert(EventAt =< Timestamp);
  3908. verify_event_fields('message.acked', Fields) ->
  3909. #{
  3910. id := ID,
  3911. clientid := ClientId,
  3912. username := Username,
  3913. from_clientid := FromClientId,
  3914. from_username := FromUsername,
  3915. payload := Payload,
  3916. peerhost := PeerHost,
  3917. topic := Topic,
  3918. qos := QoS,
  3919. flags := Flags,
  3920. pub_props := PubProps,
  3921. puback_props := PubAckProps,
  3922. timestamp := Timestamp,
  3923. publish_received_at := EventAt
  3924. } = Fields,
  3925. Now = erlang:system_time(millisecond),
  3926. TimestampElapse = Now - Timestamp,
  3927. RcvdAtElapse = Now - EventAt,
  3928. ?assert(is_binary(ID)),
  3929. ?assertEqual(<<"c_event2">>, ClientId),
  3930. ?assertEqual(<<"u_event2">>, Username),
  3931. ?assertEqual(<<"c_event">>, FromClientId),
  3932. ?assertEqual(<<"u_event">>, FromUsername),
  3933. ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
  3934. verify_ipaddr(PeerHost),
  3935. ?assertEqual(<<"t1">>, Topic),
  3936. ?assertEqual(1, QoS),
  3937. ?assert(is_map(Flags)),
  3938. ?assertMatch(#{'Message-Expiry-Interval' := 60}, PubProps),
  3939. ?assert(is_map(PubAckProps)),
  3940. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000),
  3941. ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000),
  3942. ?assert(EventAt =< Timestamp);
  3943. verify_event_fields('client.connack', Fields) ->
  3944. #{
  3945. clientid := ClientId,
  3946. clean_start := CleanStart,
  3947. username := Username,
  3948. peername := PeerName,
  3949. sockname := SockName,
  3950. proto_name := ProtoName,
  3951. proto_ver := ProtoVer,
  3952. keepalive := Keepalive,
  3953. expiry_interval := ExpiryInterval,
  3954. conn_props := Properties,
  3955. reason_code := Reason,
  3956. timestamp := Timestamp
  3957. } = Fields,
  3958. Now = erlang:system_time(millisecond),
  3959. TimestampElapse = Now - Timestamp,
  3960. ?assert(lists:member(Reason, [success, bad_username_or_password])),
  3961. ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>, <<"c_event3">>])),
  3962. ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>, <<"u_event3">>])),
  3963. verify_peername(PeerName),
  3964. verify_peername(SockName),
  3965. ?assertEqual(<<"MQTT">>, ProtoName),
  3966. ?assertEqual(5, ProtoVer),
  3967. ?assert(is_integer(Keepalive)),
  3968. ?assert(is_boolean(CleanStart)),
  3969. ?assertEqual(60000, ExpiryInterval),
  3970. ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
  3971. ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000);
  3972. verify_event_fields('client.check_authz_complete', Fields) ->
  3973. #{
  3974. clientid := ClientId,
  3975. action := Action,
  3976. result := Result,
  3977. topic := Topic,
  3978. authz_source := AuthzSource,
  3979. username := Username
  3980. } = Fields,
  3981. ?assertEqual(<<"t1">>, Topic),
  3982. ?assert(lists:member(Action, [subscribe, publish])),
  3983. ?assert(lists:member(Result, [allow, deny])),
  3984. ?assert(
  3985. lists:member(AuthzSource, [
  3986. cache,
  3987. default,
  3988. file,
  3989. http,
  3990. mongodb,
  3991. mysql,
  3992. redis,
  3993. postgresql,
  3994. built_in_database
  3995. ])
  3996. ),
  3997. ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
  3998. ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])).
  3999. verify_peername(PeerName) ->
  4000. case string:split(PeerName, ":") of
  4001. [IPAddrS, PortS] ->
  4002. verify_ipaddr(IPAddrS),
  4003. _ = binary_to_integer(PortS);
  4004. _ ->
  4005. ct:fail({invalid_peername, PeerName})
  4006. end.
  4007. verify_ipaddr(IPAddrS) ->
  4008. ?assertMatch({ok, _}, inet:parse_address(binary_to_list(IPAddrS))).
  4009. init_events_counters() ->
  4010. ets:new(events_record_tab, [named_table, bag, public]).
  4011. user_properties(PairsMap) ->
  4012. #{'User-Property' => maps:to_list(PairsMap)}.
  4013. %%------------------------------------------------------------------------------
  4014. %% Start Apps
  4015. %%------------------------------------------------------------------------------
  4016. deps_path(App, RelativePath) ->
  4017. Path0 = code:lib_dir(App),
  4018. Path =
  4019. case file:read_link(Path0) of
  4020. {ok, Resolved} -> Resolved;
  4021. {error, _} -> Path0
  4022. end,
  4023. filename:join([Path, RelativePath]).
  4024. local_path(RelativePath) ->
  4025. deps_path(emqx_rule_engine, RelativePath).
  4026. create_rules(Rules) ->
  4027. lists:foreach(fun create_rule/1, Rules).
  4028. create_rule(Rule) ->
  4029. {ok, _} = emqx_rule_engine:create_rule(Rule),
  4030. ok.
  4031. delete_rules_by_ids(Ids) ->
  4032. lists:foreach(
  4033. fun(Id) ->
  4034. ok = emqx_rule_engine:delete_rule(Id)
  4035. end,
  4036. Ids
  4037. ).
  4038. delete_rule(#{id := Id}) ->
  4039. ok = emqx_rule_engine:delete_rule(Id);
  4040. delete_rule(Id) when is_binary(Id) ->
  4041. ok = emqx_rule_engine:delete_rule(Id).