Skip to content

Commit 40e68e9

Browse files
committed
Add CASE/WHEN
1 parent a962e36 commit 40e68e9

File tree

4 files changed

+101
-20
lines changed

4 files changed

+101
-20
lines changed

src/emqx_rule_runtime.erl

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
-export([ apply_rule/2
3838
, columns/1
39+
, clear_rule_payload/0
3940
]).
4041

4142
-import(emqx_rule_maps,
@@ -123,7 +124,7 @@ rules_for(Hook) ->
123124

124125
-spec(apply_rules(list(emqx_rule_engine:rule()), map()) -> ok).
125126
apply_rules([], _Input) ->
126-
erlang:erase(rule_payload),
127+
clear_rule_payload(),
127128
ok;
128129
apply_rules([#rule{enabled = false}|More], Input) ->
129130
apply_rules(More, Input);
@@ -181,6 +182,9 @@ apply_rule(#rule{id = RuleId,
181182
{error, nomatch}
182183
end.
183184

185+
clear_rule_payload() ->
186+
erlang:erase(rule_payload).
187+
184188
%% Step1 -> Select and transform data
185189
select_and_transform(Fields, Input) ->
186190
select_and_transform(Fields, Input, #{}).
@@ -317,6 +321,10 @@ eval({const, Val}, _Input) ->
317321
Val;
318322
eval({Op, L, R}, Input) when ?is_arith(Op) ->
319323
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
324+
eval({'case', undefined, CaseClauses, ElseClauses}, Input) ->
325+
eval_case_clauses(CaseClauses, ElseClauses, Input);
326+
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
327+
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
320328
eval({'fun', Name, Args}, Input) ->
321329
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
322330

@@ -332,6 +340,33 @@ alias({const, Val}) ->
332340
Val;
333341
alias(_) -> undefined.
334342

343+
eval_case_clauses([], ElseClauses, Input) ->
344+
case ElseClauses of
345+
undefined -> undefined;
346+
_ -> eval(ElseClauses, Input)
347+
end;
348+
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
349+
case match_conditions(Cond, Input) of
350+
true ->
351+
eval(Clause, Input);
352+
_ ->
353+
eval_case_clauses(CaseClauses, ElseClauses, Input)
354+
end.
355+
356+
eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
357+
case ElseClauses of
358+
undefined -> undefined;
359+
_ -> eval(ElseClauses, Input)
360+
end;
361+
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
362+
ConResult = eval(Cond, Input),
363+
case eval(CaseOn, Input) of
364+
ConResult ->
365+
eval(Clause, Input);
366+
_ ->
367+
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
368+
end.
369+
335370
apply_func(Name, Args, Input) when is_atom(Name) ->
336371
case erlang:apply(emqx_rule_funcs, Name, Args) of
337372
Func when is_function(Func) ->

src/emqx_rule_sqlparser.erl

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -104,22 +104,23 @@ select_where(#select{where = Where}) ->
104104
Where.
105105

106106
preprocess(#select{fields = Fields, is_foreach = IsForeach, doeach = DoEach, incase = InCase, from = Hooks, where = Conditions}) ->
107-
Selected = [preprocess_field(Field) || Field <- Fields],
108107
Froms = [hook(unquote(H)) || H <- Hooks],
109-
FullColumns = as_columns(Selected) ++ fixed_columns(Froms),
108+
FixedColumns = fixed_columns(Froms),
109+
Selected = [preprocess_field(Field, FixedColumns) || Field <- Fields],
110+
FullColumns = as_columns(Selected) ++ FixedColumns,
110111
#select{is_foreach = IsForeach,
111112
fields = Selected,
112-
doeach = [preprocess_field(Each) || Each <- DoEach],
113+
doeach = [preprocess_field(Each, FullColumns) || Each <- DoEach],
113114
incase = preprocess_condition(InCase, FullColumns),
114115
from = Froms,
115116
where = preprocess_condition(Conditions, FullColumns)}.
116117

117-
preprocess_field(<<"*">>) ->
118+
preprocess_field(<<"*">>, _Columns) ->
118119
'*';
119-
preprocess_field({'as', Field, Alias}) when is_binary(Alias) ->
120-
{'as', transform_select_field(Field), transform_alias(Alias)};
121-
preprocess_field(Field) ->
122-
transform_select_field(Field).
120+
preprocess_field({'as', Field, Alias}, Columns) when is_binary(Alias) ->
121+
{'as', transform_select_field(Field, Columns), transform_alias(Alias)};
122+
preprocess_field(Field, Columns) ->
123+
transform_select_field(Field, Columns).
123124

124125
preprocess_condition({Op, L, R}, Columns) when ?is_logical(Op) orelse ?is_comp(Op) ->
125126
{Op, preprocess_condition(L, Columns), preprocess_condition(R, Columns)};
@@ -151,15 +152,31 @@ do_transform_field({'fun', Name, Args}, Columns) when is_binary(Name) ->
151152
Fun = list_to_existing_atom(binary_to_list(Name)),
152153
{'fun', Fun, [transform_field(Arg, Columns) || Arg <- Args]}.
153154

154-
transform_select_field({const, Val}) ->
155+
transform_select_field({const, Val}, _Columns) ->
155156
{const, Val};
156-
transform_select_field({Op, Arg1, Arg2}) when ?is_arith(Op) ->
157-
{Op, transform_select_field(Arg1), transform_select_field(Arg2)};
158-
transform_select_field(Var) when is_binary(Var) ->
159-
{var, escape(parse_nested(Var))};
160-
transform_select_field({'fun', Name, Args}) when is_binary(Name) ->
157+
transform_select_field({Op, Arg1, Arg2}, Columns) when ?is_arith(Op) ->
158+
{Op, transform_select_field(Arg1, Columns), transform_select_field(Arg2, Columns)};
159+
transform_select_field(Var, Columns) when is_binary(Var) ->
160+
{var, validate_var(escape(parse_nested(Var)), Columns)};
161+
transform_select_field({'case', CaseOn, CaseClauses, ElseClause}, Columns) ->
162+
{'case', transform_caseon(CaseOn, Columns),
163+
transform_case_clause(CaseClauses, Columns),
164+
transform_caseelse(ElseClause, Columns)};
165+
transform_select_field({'fun', Name, Args}, Columns) when is_binary(Name) ->
161166
Fun = list_to_existing_atom(binary_to_list(Name)),
162-
{'fun', Fun, [transform_select_field(Arg) || Arg <- Args]}.
167+
{'fun', Fun, [transform_select_field(Arg, Columns) || Arg <- Args]}.
168+
169+
transform_caseon(<<>>, _Columns) -> undefined;
170+
transform_caseon(CaseOn, Columns) ->
171+
transform_select_field(CaseOn, Columns).
172+
173+
transform_case_clause(CaseClauses, Columns) ->
174+
[{preprocess_condition(Cond, Columns), transform_select_field(Return, Columns)}
175+
|| {Cond, Return} <- CaseClauses].
176+
177+
transform_caseelse({}, _Columns) -> undefined;
178+
transform_caseelse(ElseClause, Columns) ->
179+
transform_select_field(ElseClause, Columns).
163180

164181
validate_var(Var, SupportedColumns) ->
165182
case {Var, lists:member(Var, SupportedColumns)} of

src/emqx_rule_sqltester.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
4343
#action_instance_params{id = ActInstId,
4444
params = #{},
4545
apply = sql_test_action()}),
46+
emqx_rule_runtime:clear_rule_payload(),
4647
emqx_rule_runtime:apply_rule(Rule, FullContext)
4748
of
4849
{ok, Data} -> {ok, flatten(Data)};

test/emqx_rule_engine_SUITE.erl

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ groups() ->
8888
t_sqlselect_1,
8989
t_sqlselect_2,
9090
t_sqlselect_3,
91-
t_sqlparse_foreach
91+
t_sqlparse_foreach,
92+
t_sqlparse_case_when
9293
]}
9394
].
9495

@@ -807,15 +808,42 @@ t_sqlparse_foreach(_Config) ->
807808
"incase is_not_null(s.cmd) "
808809
"from \"message.publish\" "
809810
"where topic =~ 't/#'",
810-
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
811-
ct:log("======Select: ~p", [Select]),
812811
Res = emqx_rule_sqltester:test(
813812
#{<<"rawsql">> => Sql,
814813
<<"ctx">> =>
815814
#{<<"payload">> =>
816815
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
817816
<<"topic">> => <<"t/a">>}}),
818-
ct:log("======Result: ~p", [Res]),
817+
?assertMatch({ok,[#{msg_type := <<"1">>},#{msg_type := <<"2">>}]}, Res).
818+
819+
t_sqlparse_case_when(_Config) ->
820+
Sql = "select "
821+
" case when payload.x < 0 then 0 "
822+
" when payload.x > 7 then 7 "
823+
" else payload.x "
824+
" end as y "
825+
"from \"message.publish\" "
826+
"where topic =~ 't/#'",
827+
?assertMatch({ok, #{y := 1}}, emqx_rule_sqltester:test(
828+
#{<<"rawsql">> => Sql,
829+
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
830+
<<"topic">> => <<"t/a">>}})),
831+
?assertMatch({ok, #{y := 0}}, emqx_rule_sqltester:test(
832+
#{<<"rawsql">> => Sql,
833+
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 0}">>,
834+
<<"topic">> => <<"t/a">>}})),
835+
?assertMatch({ok, #{y := 0}}, emqx_rule_sqltester:test(
836+
#{<<"rawsql">> => Sql,
837+
<<"ctx">> => #{<<"payload">> => <<"{\"x\": -1}">>,
838+
<<"topic">> => <<"t/a">>}})),
839+
?assertMatch({ok, #{y := 7}}, emqx_rule_sqltester:test(
840+
#{<<"rawsql">> => Sql,
841+
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 7}">>,
842+
<<"topic">> => <<"t/a">>}})),
843+
?assertMatch({ok, #{y := 7}}, emqx_rule_sqltester:test(
844+
#{<<"rawsql">> => Sql,
845+
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 8}">>,
846+
<<"topic">> => <<"t/a">>}})),
819847
ok.
820848

821849
%%------------------------------------------------------------------------------

0 commit comments

Comments
 (0)