Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down Expand Up @@ -297,6 +298,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down Expand Up @@ -559,6 +561,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_access_control.erl",
"src/rabbit_alarm.erl",
"src/rabbit_amqqueue.erl",
"src/rabbit_amqqueue_control.erl",
"src/rabbit_amqqueue_process.erl",
"src/rabbit_amqqueue_sup.erl",
"src/rabbit_amqqueue_sup_sup.erl",
Expand Down
109 changes: 108 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
-export([delete_crashed/1,
delete_crashed/2,
delete_crashed_internal/2]).

-export([delete_with/4, delete_with/6]).
-export([pid_of/1, pid_of/2]).
-export([pid_or_crashed/2]).
-export([mark_local_durable_queues_stopped/1]).

-export([rebalance/3]).
Expand All @@ -71,6 +72,8 @@
-export([prepend_extra_bcc/1]).
-export([queue/1, queue_names/1]).

-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
Expand Down Expand Up @@ -116,6 +119,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
active, activity_status, arguments]).
-define(KILL_QUEUE_DELAY_INTERVAL, 100).

warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
Expand Down Expand Up @@ -1601,6 +1605,51 @@ delete_immediately_by_resource(Resources) ->
delete(Q, IfUnused, IfEmpty, ActingUser) ->
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).

-spec delete_with(amqqueue:amqqueue() | name(), boolean(), boolean(), rabbit_types:username()) ->
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
delete_with(QueueName, IfUnused, IfEmpty, ActingUser) ->
delete_with(QueueName, undefined, IfUnused, IfEmpty, ActingUser, false).

-spec delete_with(amqqueue:amqqueue() | name(), pid() | undefined, boolean(), boolean(), rabbit_types:username(), boolean()) ->
rabbit_types:ok(integer()) | rabbit_misc:channel_or_connection_exit().
delete_with(AMQQueue, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when ?is_amqqueue(AMQQueue) ->
QueueName = amqqueue:get_name(AMQQueue),
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive);
delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) when is_record(QueueName, resource) ->
case with(
QueueName,
fun (Q) ->
if CheckExclusive ->
check_exclusive_access(Q, ConnPid);
true ->
ok
end,
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
end,
fun (not_found) ->
{ok, 0};
({absent, Q, crashed}) ->
_ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, stopped}) ->
_ = delete_crashed(Q, Username),
{ok, 0};
({absent, Q, Reason}) ->
absent(Q, Reason)
end) of
{error, in_use} ->
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
rabbit_misc:precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
{error, {exit, _, _}} ->
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
{ok, 0};
{ok, Count} ->
{ok, Count};
{protocol_error, Type, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.

%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS
delete_crashed(Q) when ?amqqueue_is_classic(Q) ->
rabbit_classic_queue:delete_crashed(Q).
Expand Down Expand Up @@ -2061,3 +2110,61 @@ is_queue_args_combination_permitted(Durable, Exclusive) ->
true ->
rabbit_deprecated_features:is_permitted(transient_nonexcl_queues)
end.

-spec kill_queue_hard(node(), name()) -> ok.
kill_queue_hard(Node, QRes = #resource{kind = queue}) ->
kill_queue_hard(Node, QRes, boom).

-spec kill_queue_hard(node(), name(), atom()) -> ok.
kill_queue_hard(Node, QRes = #resource{kind = queue}, Reason) ->
case kill_queue(Node, QRes, Reason) of
crashed -> ok;
stopped -> ok;
NewPid when is_pid(NewPid) ->
timer:sleep(?KILL_QUEUE_DELAY_INTERVAL),
kill_queue_hard(Node, QRes, Reason);
Error -> Error
end.

-spec kill_queue(node(), name()) -> pid() | crashed | stopped | rabbit_types:error(term()).
kill_queue(Node, QRes = #resource{kind = queue}) ->
kill_queue(Node, QRes, boom).

-spec kill_queue(node(), name(), atom()) -> pid() | crashed | stopped | rabbit_types:error(term()).
kill_queue(Node, QRes = #resource{kind = queue}, Reason = shutdown) ->
Pid1 = pid_or_crashed(Node, QRes),
exit(Pid1, Reason),
rabbit_amqqueue_control:await_state(Node, QRes, stopped),
stopped;
kill_queue(Node, QRes = #resource{kind = queue}, Reason) ->
case pid_or_crashed(Node, QRes) of
Pid1 when is_pid(Pid1) ->
exit(Pid1, Reason),
rabbit_amqqueue_control:await_new_pid(Node, QRes, Pid1);
crashed ->
crashed;
Error ->
Error
end.

-spec pid_or_crashed(node(), name()) -> pid() | crashed | rabbit_types:error(term()).
pid_or_crashed(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
case rpc:call(Node, rabbit_amqqueue, lookup, [QRes]) of
{ok, Q} ->
QPid = amqqueue:get_pid(Q),
State = amqqueue:get_state(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
{error, {queue_supervisor_not_found, _}} -> {error, no_sup};
{ok, SPid} ->
case rabbit_misc:remote_sup_child(Node, SPid) of
{ok, _} -> QPid; %% restarting
{error, no_child} -> crashed %% given up
end
end;
_ -> QPid
end;
Error = {error, _} -> Error;
Reason -> {error, Reason}
end.
57 changes: 57 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue_control.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_amqqueue_control).

-export([await_new_pid/3, await_state/3, await_state/4]).

-define(DEFAULT_AWAIT_STATE_TIMEOUT, 30000).
-define(AWAIT_NEW_PID_DELAY_INTERVAL, 10).
-define(AWAIT_STATE_DELAY_INTERVAL, 100).
-define(AWAIT_STATE_DELAY_TIME_DELTA, 100).

-include_lib("rabbit_common/include/resource.hrl").

-spec await_new_pid(node(), rabbit_amqqueue:name(), pid()) -> pid().
await_new_pid(Node, QRes = #resource{kind = queue}, OldPid) ->
case rabbit_amqqueue:pid_or_crashed(Node, QRes) of
OldPid -> timer:sleep(?AWAIT_NEW_PID_DELAY_INTERVAL),
await_new_pid(Node, QRes, OldPid);
New -> New
end.

-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom()) -> 'ok'.
await_state(Node, QName, State) when is_binary(QName) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
await_state(Node, QRes, State);
await_state(Node, QRes = #resource{kind = queue}, State) ->
await_state(Node, QRes, State, ?DEFAULT_AWAIT_STATE_TIMEOUT).

-spec await_state(node(), rabbit_amqqueue:name() | binary(), atom(), integer()) -> 'ok'.
await_state(Node, QName, State, Time) when is_binary(QName) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
await_state(Node, QRes, State, Time);
await_state(Node, QRes = #resource{kind = queue}, State, Time) ->
case state(Node, QRes) of
State ->
ok;
Other ->
case Time of
0 -> exit({timeout_awaiting_state, State, Other});
_ -> timer:sleep(?AWAIT_STATE_DELAY_INTERVAL),
await_state(Node, QRes, State, Time - ?AWAIT_STATE_DELAY_TIME_DELTA)
end
end.

state(Node, QRes = #resource{virtual_host = VHost, kind = queue}) ->
Infos = rpc:call(Node, rabbit_amqqueue, info_all, [VHost, [name, state]]),
fetch_state(QRes, Infos).

fetch_state(_QRes, []) -> undefined;
fetch_state(QRes, [[{name, QRes}, {state, State}] | _]) -> State;
fetch_state(QRes, [[{name, _}, {state, _State}] | Rem]) ->
fetch_state(QRes, Rem).
65 changes: 14 additions & 51 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -930,15 +930,6 @@ handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
{stop, normal, State1}
end.

-spec precondition_failed(string()) -> no_return().

precondition_failed(Format) -> precondition_failed(Format, []).

-spec precondition_failed(string(), [any()]) -> no_return().

precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).

return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount,
#ch{cfg = Cfg} = State) ->
Expand Down Expand Up @@ -995,15 +986,15 @@ check_user_id_header(#'P_basic'{user_id = Claimed},
tags = Tags}}}) ->
case lists:member(impersonator, Tags) of
true -> ok;
false -> precondition_failed(
false -> rabbit_misc:precondition_failed(
"user_id property set to '~ts' but authenticated user was "
"'~ts'", [Claimed, Actual])
end.

check_expiration_header(Props) ->
case rabbit_basic:parse_expiration(Props) of
{ok, _} -> ok;
{error, E} -> precondition_failed("invalid expiration '~ts': ~tp",
{error, E} -> rabbit_misc:precondition_failed("invalid expiration '~ts': ~tp",
[Props#'P_basic'.expiration, E])
end.

Expand Down Expand Up @@ -1074,15 +1065,15 @@ check_msg_size(Content, MaxMessageSize, GCThreshold) ->
_ ->
"message size ~B is larger than configured max size ~B"
end,
precondition_failed(ErrorMessage,
rabbit_misc:precondition_failed(ErrorMessage,
[Size, MaxMessageSize]);
_ -> ok
end.

check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false -> ok;
{true, Limit} -> precondition_failed("cannot declare queue '~ts': "
{true, Limit} -> rabbit_misc:precondition_failed("cannot declare queue '~ts': "
"queue limit in vhost '~ts' (~tp) is reached",
[QueueName, VHost, Limit])

Expand Down Expand Up @@ -1704,7 +1695,7 @@ handle_method(#'queue.purge'{nowait = NoWait} = Method,
end;

handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
rabbit_misc:precondition_failed("cannot switch from confirm to tx mode");

handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
{reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
Expand All @@ -1713,7 +1704,7 @@ handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State};

handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
rabbit_misc:precondition_failed("channel is not transactional");

handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
limiter = Limiter}) ->
Expand All @@ -1731,7 +1722,7 @@ handle_method(#'tx.commit'{}, _, State = #ch{tx = {Deliveries, Acks},
{noreply, maybe_complete_tx(State3#ch{tx = committing})};

handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
rabbit_misc:precondition_failed("channel is not transactional");

handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = {_Msgs, Acks}}) ->
Expand All @@ -1741,7 +1732,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
tx = new_tx()}};

handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
rabbit_misc:precondition_failed("cannot switch from tx to confirm mode");

handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
Expand All @@ -1762,7 +1753,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
{ok, {Q, _CParams}} ->
{ok, QStates, Actions} = rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates0),
{noreply, handle_queue_actions(Actions, State#ch{queue_states = QStates})};
error -> precondition_failed(
error -> rabbit_misc:precondition_failed(
"unknown consumer tag '~ts'", [CTag])
end;

Expand Down Expand Up @@ -2050,7 +2041,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
UAMQTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
rabbit_misc:precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.

%% Settles (acknowledges) messages at the queue replica process level.
Expand Down Expand Up @@ -2540,7 +2531,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
undefined ->
ok;
{error, {invalid_type, Type}} ->
precondition_failed(
rabbit_misc:precondition_failed(
"invalid type '~ts' for arg '~ts' in ~ts",
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
DLX ->
Expand Down Expand Up @@ -2605,35 +2596,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),

check_configure_permitted(QueueName, User, AuthzContext),
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, Username)
end,
fun (not_found) ->
{ok, 0};
({absent, Q, crashed}) ->
_ = rabbit_classic_queue:delete_crashed(Q, Username),
{ok, 0};
({absent, Q, stopped}) ->
_ = rabbit_classic_queue:delete_crashed(Q, Username),
{ok, 0};
({absent, Q, Reason}) ->
rabbit_amqqueue:absent(Q, Reason)
end) of
{error, in_use} ->
precondition_failed("~ts in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
precondition_failed("~ts not empty", [rabbit_misc:rs(QueueName)]);
{error, {exit, _, _}} ->
%% rabbit_amqqueue:delete()/delegate:invoke might return {error, {exit, _, _}}
{ok, 0};
{ok, Count} ->
{ok, Count};
{protocol_error, Type, Reason, ReasonArgs} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end;
rabbit_amqqueue:delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, true);
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
_ConnPid, AuthzContext, _CollectorPid, VHostPath,
Expand All @@ -2647,7 +2610,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
ok;
{error, in_use} ->
precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
rabbit_misc:precondition_failed("~ts in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
ok
end;
Expand Down Expand Up @@ -2689,7 +2652,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
undefined -> ok;
{error, {invalid_type, Type}} ->
precondition_failed(
rabbit_misc:precondition_failed(
"invalid type '~ts' for arg '~ts' in ~ts",
[Type, AeKey, rabbit_misc:rs(ExchangeName)]);
AName -> check_read_permitted(ExchangeName, User, AuthzContext),
Expand Down
Loading