- Notifications
You must be signed in to change notification settings - Fork 2
fix: single-connection mode connection reestablishment #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5e77f79
8f8e1b7
450b668
94dee1d
95c612a
72a8878
1278a96
9d87dd3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
| @@ -4,14 +4,19 @@ use std::fmt; | |
use std::future::Future; | ||
use std::io; | ||
use std::pin::Pin; | ||
use std::task::Poll; | ||
| ||
use byteorder::{ByteOrder, NetworkEndian}; | ||
use futures::poll; | ||
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; | ||
use tacacs_plus_protocol::{Deserialize, PacketBody, Serialize}; | ||
use tacacs_plus_protocol::{HeaderInfo, Packet, PacketFlags}; | ||
| ||
use super::ClientError; | ||
| ||
#[cfg(test)] | ||
mod tests; | ||
| ||
/// A (pinned, boxed) future that returns a client connection or an error, as returned from a [`ConnectionFactory`]. | ||
/// | ||
/// This is roughly equivalent to the [`BoxFuture`](futures::future::BoxFuture) type in the `futures` crate, but without | ||
| @@ -119,11 +124,28 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> { | |
Ok(conn) | ||
} | ||
| ||
/// Writes a packet to the underlying connection. | ||
/// Writes a packet to the underlying connection, reconnecting if necessary. | ||
pub(super) async fn send_packet<B: PacketBody + Serialize>( | ||
&mut self, | ||
packet: Packet<B>, | ||
secret_key: Option<&[u8]>, | ||
) -> Result<(), ClientError> { | ||
// check if other end closed our connection, and reopen it accordingly | ||
let connection = self.connection().await?; | ||
if !is_connection_open(connection).await? { | ||
self.post_session_cleanup(true).await?; | ||
} | ||
| ||
// send the packet after ensuring the connection is valid (or dropping | ||
// it if it's invalid) | ||
self._send_packet(packet, secret_key).await | ||
} | ||
| ||
/// Writes a packet to the underlying connection. | ||
async fn _send_packet<B: PacketBody + Serialize>( | ||
&mut self, | ||
packet: Packet<B>, | ||
secret_key: Option<&[u8]>, | ||
) -> Result<(), ClientError> { | ||
// allocate zero-filled buffer large enough to hold packet | ||
let mut packet_buffer = vec![0; packet.wire_size()]; | ||
| @@ -195,7 +217,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> { | |
pub(super) async fn post_session_cleanup(&mut self, status_is_error: bool) -> io::Result<()> { | ||
// close session if server doesn't agree to SINGLE_CONNECTION negotiation, or if an error occurred (since a mutex guarantees only one session is going at a time) | ||
if !self.single_connection_established || status_is_error { | ||
// SAFETY: ensure_connection should be called before this function, and guarantees inner.connection is non-None | ||
// SAFETY: connection() should be called before this function, and guarantees inner.connection is non-None | ||
let mut connection = self.connection.take().unwrap(); | ||
connection.close().await?; | ||
| ||
| @@ -212,3 +234,48 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ClientInner<S> { | |
Ok(()) | ||
} | ||
} | ||
| ||
/// Checks if the provided connection is still open on both sides. | ||
/// | ||
/// This is accomplished by attempting to read a single byte from the connection | ||
/// and checking for an EOF condition or specific errors (broken pipe/connection reset). | ||
/// | ||
/// This might be overkill, but during testing I encountered a case where a write succeeded | ||
/// and a subsequent read hung due to the connection being closed on the other side, so | ||
/// avoiding that is preferable. | ||
async fn is_connection_open<C>(connection: &mut C) -> io::Result<bool> | ||
where | ||
C: AsyncRead + Unpin, | ||
{ | ||
// read into a 1-byte buffer, since a 0-byte buffer might return 0 besides just on EOF | ||
let mut buffer = [0]; | ||
| ||
// poll the read future exactly once to see if anything is ready immediately | ||
match poll!(connection.read(&mut buffer)) { | ||
// something ready on first poll likely indicates something wrong, since we aren't | ||
// expecting any data to actually be ready | ||
Poll::Ready(ready) => match ready { | ||
// read of length 0 indicates an EOF, which happens when the other side closes a TCP connection | ||
Ok(0) => Ok(false), | ||
| ||
Err(e) => match e.kind() { | ||
// these errors indicate that the connection is closed, which is the exact | ||
// situation we're trying to recover from | ||
// | ||
// BrokenPipe seems to be Linux-specific (?), ConnectionReset is more general though | ||
// (checked TCP & read(2) man pages for MacOS/FreeBSD/Linux) | ||
io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset => Ok(false), | ||
| ||
// bubble up any other errors to the caller | ||
_ => Err(e), | ||
}, | ||
| ||
// if there's data still available, the connection is still open, although | ||
// this shouldn't happen in the context of TACACS+ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should be debug/trace log here? Would this be possible to hit in a multi-threaded context? Or would it hit the mutex lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be prevented by the mutex since it's locked for the full session within Client | ||
Ok(1..) => Ok(true), | ||
}, | ||
| ||
// nothing ready to read -> connection is still open | ||
Poll::Pending => Ok(true), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
| ||
use futures::AsyncWriteExt; | ||
use tokio::net::{TcpListener, TcpStream}; | ||
use tokio::sync::Notify; | ||
use tokio_util::compat::TokioAsyncReadCompatExt; | ||
| ||
use super::is_connection_open; | ||
| ||
async fn bind_to_port(port: u16) -> TcpListener { | ||
TcpListener::bind(("localhost", port)) | ||
.await | ||
.unwrap_or_else(|err| panic!("failed to bind to address localhost:{port}: {err:?}")) | ||
} | ||
| ||
#[tokio::test] | ||
async fn connection_open_check() { | ||
let notify = Arc::new(Notify::new()); | ||
let listener_notify = notify.clone(); | ||
| ||
tokio::spawn(async move { | ||
let listener = bind_to_port(9999).await; | ||
listener_notify.notify_one(); | ||
| ||
let (_stream, _) = listener | ||
.accept() | ||
.await | ||
.expect("failed to accept connection"); | ||
| ||
// this is done to keep the stream open for the rest of the test | ||
listener_notify.notified().await; | ||
}); | ||
| ||
// wait for server to bind to address | ||
notify.notified().await; | ||
| ||
let client = TcpStream::connect(("localhost", 9999)) | ||
.await | ||
.expect("couldn't connect to test listener"); | ||
let mut client = client.compat(); | ||
| ||
let is_open = is_connection_open(&mut client) | ||
.await | ||
.expect("couldn't check if connection was open"); | ||
assert!(is_open); | ||
| ||
notify.notify_one(); | ||
} | ||
| ||
#[tokio::test] | ||
async fn connection_closed_check() { | ||
let notify = Arc::new(Notify::new()); | ||
let listener_notify = notify.clone(); | ||
| ||
tokio::spawn(async move { | ||
let listener = bind_to_port(9998).await; | ||
listener_notify.notify_one(); | ||
| ||
let (stream, _) = listener | ||
.accept() | ||
.await | ||
.expect("failed to accept connection"); | ||
| ||
let mut stream = stream.compat(); | ||
| ||
// close connection & notify main test task | ||
stream.close().await.unwrap(); | ||
| ||
// wait for a bit before notifying main task; this caused some sporadic failures | ||
// during testing when omitted | ||
tokio::time::sleep(Duration::from_millis(250)).await; | ||
| ||
// notify main task that stream is closed | ||
listener_notify.notify_one(); | ||
}); | ||
| ||
// wait for server to bind to address | ||
notify.notified().await; | ||
| ||
let client = TcpStream::connect(("localhost", 9998)) | ||
.await | ||
.expect("couldn't connect to test listener"); | ||
let mut client = client.compat(); | ||
| ||
// let server close connection | ||
notify.notified().await; | ||
| ||
// ensure connection is detected as closed | ||
let is_open = is_connection_open(&mut client) | ||
.await | ||
.expect("couldn't check if connection was open"); | ||
assert!(!is_open); | ||
} |
Uh oh!
There was an error while loading. Please reload this page.