Skip to content

Commit 5c121b1

Browse files
committed
simplify waiting for recovery
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
1 parent 4b27a4a commit 5c121b1

File tree

7 files changed

+48
-39
lines changed

7 files changed

+48
-39
lines changed

examples/p.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use lapin::{
2-
options::*, types::FieldTable, BasicProperties, ChannelState, Connection, ConnectionProperties,
3-
Error,
2+
options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties,
43
};
54
use tracing::info;
65

@@ -85,16 +84,13 @@ fn main() {
8584
}
8685
Err(err) => {
8786
println!("GOT ERROR");
88-
match err {
89-
Error::InvalidChannelState(ChannelState::Reconnecting, Some(notifier)) => {
90-
notifier.await
91-
}
92-
err => {
93-
if !err.is_amqp_soft_error() {
94-
panic!("{}", err);
95-
}
96-
errors += 1;
97-
}
87+
let (soft, notifier) = err.is_amqp_soft_error();
88+
if !soft {
89+
panic!("{}", err);
90+
}
91+
errors += 1;
92+
if let Some(notifier) = notifier {
93+
notifier.await
9894
}
9995
}
10096
}

src/acker.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,13 @@ impl Acker {
7878

7979
async fn rpc<F: Fn(&InternalRPCHandle, PromiseResolver<()>)>(&self, f: F) -> Result<()> {
8080
if self.used.swap(true, Ordering::SeqCst) {
81-
return Err(Error::ProtocolError(AMQPError::new(
82-
AMQPSoftError::PRECONDITIONFAILED.into(),
83-
"Attempted to use an already used Acker".into(),
84-
)));
81+
return Err(Error::ProtocolError(
82+
AMQPError::new(
83+
AMQPSoftError::PRECONDITIONFAILED.into(),
84+
"Attempted to use an already used Acker".into(),
85+
),
86+
None,
87+
));
8588
}
8689
if let Some(error) = self.error.as_ref() {
8790
error.check()?;

src/channel.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl Channel {
295295
class_id,
296296
method_id,
297297
);
298-
Err(Error::ProtocolError(error))
298+
Err(Error::ProtocolError(error, None))
299299
}
300300
}
301301

@@ -426,7 +426,7 @@ impl Channel {
426426
class_id,
427427
method_id,
428428
);
429-
Err(Error::ProtocolError(error))
429+
Err(Error::ProtocolError(error, None))
430430
}
431431

432432
pub(crate) fn handle_content_header_frame(
@@ -465,7 +465,7 @@ impl Channel {
465465
class_id,
466466
0,
467467
);
468-
let error = Error::ProtocolError(error);
468+
let error = Error::ProtocolError(error, None);
469469
self.set_connection_error(error.clone());
470470
Err(error)
471471
},
@@ -534,7 +534,7 @@ impl Channel {
534534
)
535535
.await
536536
});
537-
Err(Error::ProtocolError(err))
537+
Err(Error::ProtocolError(err, None))
538538
}
539539

540540
fn before_connection_start_ok(
@@ -553,7 +553,7 @@ impl Channel {
553553
}
554554

555555
fn on_connection_close_ok_sent(&self, error: Error) {
556-
if let Error::ProtocolError(_) = error {
556+
if let Error::ProtocolError(_, _) = error {
557557
self.internal_rpc.set_connection_error(error);
558558
} else {
559559
self.internal_rpc.set_connection_closed(error);
@@ -573,7 +573,7 @@ impl Channel {
573573

574574
fn on_channel_close_ok_sent(&self, error: Option<Error>) {
575575
if !self.recovery_config.auto_recover_channels
576-
|| !error.as_ref().map_or(false, Error::is_amqp_soft_error)
576+
|| !error.as_ref().map_or(false, |e| e.is_amqp_soft_error().0)
577577
{
578578
self.set_closed(
579579
error
@@ -822,7 +822,7 @@ impl Channel {
822822
?error,
823823
"Connection closed",
824824
);
825-
Error::ProtocolError(error)
825+
Error::ProtocolError(error, None)
826826
})
827827
.unwrap_or_else(|error| {
828828
error!(%error);
@@ -911,13 +911,13 @@ impl Channel {
911911
channel=%self.id, ?method, ?error,
912912
"Channel closed"
913913
);
914-
Error::ProtocolError(error)
914+
Error::ProtocolError(error, None)
915915
});
916916
match (
917917
self.recovery_config.auto_recover_channels,
918918
error.clone().ok(),
919919
) {
920-
(true, Some(error)) if error.is_amqp_soft_error() => {
920+
(true, Some(error)) if error.is_amqp_soft_error().0 => {
921921
self.status.set_reconnecting(error)
922922
}
923923
(_, err) => self.set_closing(err),

src/channel_recovery_context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ pub(crate) struct ChannelRecoveryContext {
1414

1515
impl ChannelRecoveryContext {
1616
pub(crate) fn new(cause: Error) -> Self {
17+
let notifier = Notifier::default();
1718
Self {
18-
cause,
19+
cause: cause.with_notifier(notifier.clone()),
1920
expected_replies: None,
20-
notifier: Notifier::default(),
21+
notifier,
2122
}
2223
}
2324

src/channels.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl Channels {
225225
.await
226226
});
227227
}
228-
return Err(Error::ProtocolError(error));
228+
return Err(Error::ProtocolError(error, None));
229229
}
230230
}
231231
AMQPFrame::Header(channel_id, class_id, header) => {
@@ -248,7 +248,7 @@ impl Channels {
248248
.await
249249
});
250250
}
251-
return Err(Error::ProtocolError(error));
251+
return Err(Error::ProtocolError(error, None));
252252
} else {
253253
self.handle_content_header_frame(
254254
channel_id,

src/error.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub enum Error {
2727

2828
IOError(Arc<io::Error>),
2929
ParsingError(ParserError),
30-
ProtocolError(AMQPError),
30+
ProtocolError(AMQPError, Option<Notifier>),
3131
SerialisationError(Arc<GenError>),
3232

3333
MissingHeartbeatError,
@@ -53,23 +53,30 @@ impl Error {
5353
}
5454
}
5555

56-
pub fn is_amqp_soft_error(&self) -> bool {
57-
if let Error::ProtocolError(e) = self {
56+
pub fn is_amqp_soft_error(&self) -> (bool, Option<Notifier>) {
57+
if let Error::ProtocolError(e, notifier) = self {
5858
if let AMQPErrorKind::Soft(_) = e.kind() {
59-
return true;
59+
return (true, notifier.clone());
6060
}
6161
}
62-
false
62+
(false, None)
6363
}
6464

6565
pub fn is_amqp_hard_error(&self) -> bool {
66-
if let Error::ProtocolError(e) = self {
66+
if let Error::ProtocolError(e, _) = self {
6767
if let AMQPErrorKind::Hard(_) = e.kind() {
6868
return true;
6969
}
7070
}
7171
false
7272
}
73+
74+
pub(crate) fn with_notifier(self, notifier: Notifier) -> Self {
75+
match self {
76+
Self::ProtocolError(err, _) => Self::ProtocolError(err, Some(notifier)),
77+
err => err,
78+
}
79+
}
7380
}
7481

7582
impl fmt::Display for Error {
@@ -91,7 +98,7 @@ impl fmt::Display for Error {
9198

9299
Error::IOError(e) => write!(f, "IO error: {}", e),
93100
Error::ParsingError(e) => write!(f, "failed to parse: {}", e),
94-
Error::ProtocolError(e) => write!(f, "protocol error: {}", e),
101+
Error::ProtocolError(e, _) => write!(f, "protocol error: {}", e),
95102
Error::SerialisationError(e) => write!(f, "failed to serialise: {}", e),
96103

97104
Error::MissingHeartbeatError => {
@@ -119,7 +126,7 @@ impl error::Error for Error {
119126
match self {
120127
Error::IOError(e) => Some(&**e),
121128
Error::ParsingError(e) => Some(e),
122-
Error::ProtocolError(e) => Some(e),
129+
Error::ProtocolError(e, _) => Some(e),
123130
Error::SerialisationError(e) => Some(&**e),
124131
_ => None,
125132
}
@@ -156,7 +163,9 @@ impl PartialEq for Error {
156163
false
157164
}
158165
(ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
159-
(ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
166+
(ProtocolError(left_inner, _), ProtocolError(right_inner, _)) => {
167+
left_inner == right_inner
168+
}
160169
(SerialisationError(_), SerialisationError(_)) => {
161170
error!("Unable to compare lapin::Error::SerialisationError");
162171
false

src/io_loop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ impl IoLoop {
440440
0,
441441
0,
442442
);
443-
self.critical_error(Error::ProtocolError(error))?;
443+
self.critical_error(Error::ProtocolError(error, None))?;
444444
}
445445
self.receive_buffer.consume(consumed);
446446
Ok(Some(f))

0 commit comments

Comments
 (0)