Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
06f7ea5
Initial commit
zainkabani Oct 13, 2023
1cd957e
Cleanup and add stats
zainkabani Oct 13, 2023
af6c2ea
Use an arc instead of full clones to store the parse packets
zainkabani Oct 13, 2023
2247f86
Use mutex instead
zainkabani Oct 13, 2023
d4d88c4
Merge branch 'main' into zain/reimplment-prepared-statements-with-glo…
zainkabani Oct 13, 2023
d2927d0
fmt
zainkabani Oct 13, 2023
fb23e33
clippy
zainkabani Oct 13, 2023
2fb3d4a
fmt
zainkabani Oct 13, 2023
a59aa63
fix?
zainkabani Oct 13, 2023
e2963e9
fix?
zainkabani Oct 13, 2023
8111582
fmt
zainkabani Oct 13, 2023
6e85fb2
typo
zainkabani Oct 13, 2023
46c8f9e
Update docs
zainkabani Oct 14, 2023
9a80b47
Refactor custom protocol
zainkabani Oct 14, 2023
19d8478
fmt
zainkabani Oct 14, 2023
a07874a
move custom protocol handling to before parsing
zainkabani Oct 14, 2023
bf5a39c
Support describe
zainkabani Oct 14, 2023
6a87a68
Add LRU for server side statement cache
zainkabani Oct 14, 2023
b528b95
rename variable
zainkabani Oct 15, 2023
0177af8
Refactoring
zainkabani Oct 15, 2023
cd7942b
Move docs
zainkabani Oct 15, 2023
6205548
Fix test
zainkabani Oct 15, 2023
9cd675e
fix
zainkabani Oct 15, 2023
89e2651
Update tests
zainkabani Oct 16, 2023
d37514f
trigger build
zainkabani Oct 16, 2023
bcce2d5
Add more tests
zainkabani Oct 16, 2023
63aa0c7
Reorder handling sync
zainkabani Oct 17, 2023
e392607
Support when a named describe is sent along with Parse (go pgx) and e…
zainkabani Oct 17, 2023
53880f2
don't talk to client if not needed when client sends Parse
zainkabani Oct 17, 2023
2842c85
fmt :(
zainkabani Oct 17, 2023
5604546
refactor tests
zainkabani Oct 17, 2023
6a1d7f6
nit
zainkabani Oct 17, 2023
a5d4bcf
Reduce hashing
zainkabani Oct 19, 2023
dd021c2
Reducing work done to decode describe and parse messages
zainkabani Oct 19, 2023
116a681
minor refactor
zainkabani Oct 19, 2023
72826e6
Merge branch 'main' into zain/reimplment-prepared-statements-with-glo…
zainkabani Oct 20, 2023
cfe8e9f
Merge branch 'main' into zain/reimplment-prepared-statements-with-glo…
zainkabani Oct 20, 2023
b27c918
Rewrite extended and prepared protocol message handling to better sup…
zainkabani Oct 21, 2023
d107bbe
An attempt to better handle if there are DDL changes that might break…
zainkabani Oct 21, 2023
21b9cde
fix
zainkabani Oct 21, 2023
d791f06
Minor stats fixed and cleanup
zainkabani Oct 21, 2023
a57550d
Cosmetic fixes (#64)
levkk Oct 23, 2023
db70499
Change server drop for statement cache error to a `deallocate all`
zainkabani Oct 23, 2023
7fa1147
Updated comments and added new idea for handling DDL changes impactin…
zainkabani Oct 23, 2023
005029d
fix test?
zainkabani Oct 23, 2023
6928d31
Revert test change
zainkabani Oct 23, 2023
b889b4b
trigger build, flakey test
zainkabani Oct 23, 2023
0dd5e88
Avoid potential race conditions by changing get_or_insert to promote …
zainkabani Oct 24, 2023
962090a
remove ps enabled variable on the server in favor of using an option
zainkabani Oct 24, 2023
80aa607
Add close to the Extended Protocol buffer
zainkabani Oct 24, 2023
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor custom protocol
  • Loading branch information
zainkabani committed Oct 14, 2023
commit 9a80b47a7b6b502df98e83fa41cdeafd1b7df25e
150 changes: 82 additions & 68 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use crate::config::{
get_config, get_idle_client_in_transaction_timeout, get_prepared_statements_cache_size,
Address, PoolMode,
};
use crate::constants::*;
use crate::messages::*;
use crate::plugins::PluginOutput;
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter};
use crate::server::{Server, ServerParameters};
use crate::stats::{ClientStats, ServerStats};
use crate::tls::Tls;
use crate::{constants::*, query_router};

use tokio_rustls::server::TlsStream;

Expand Down Expand Up @@ -1006,73 +1006,9 @@ where
let current_shard = query_router.shard();

// Handle all custom protocol commands, if any.
match query_router.try_execute_command(&message) {
// Normal query, not a custom command.
None => (),

// SET SHARD TO
Some((Command::SetShard, _)) => {
match query_router.shard() {
None => (),
Some(selected_shard) => {
if selected_shard >= pool.shards() {
// Bad shard number, send error message to client.
query_router.set_shard(current_shard);

error_response(
&mut self.write,
&format!(
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
selected_shard,
pool.shards(),
current_shard,
),
)
.await?;
} else {
custom_protocol_response_ok(&mut self.write, "SET SHARD").await?;
}
}
}
continue;
}

// SET PRIMARY READS TO
Some((Command::SetPrimaryReads, _)) => {
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
continue;
}

// SET SHARDING KEY TO
Some((Command::SetShardingKey, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
continue;
}

// SET SERVER ROLE TO
Some((Command::SetServerRole, _)) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
continue;
}

// SHOW SERVER ROLE
Some((Command::ShowServerRole, value)) => {
show_response(&mut self.write, "server role", &value).await?;
continue;
}

// SHOW SHARD
Some((Command::ShowShard, value)) => {
show_response(&mut self.write, "shard", &value).await?;
continue;
}

// SHOW PRIMARY READS
Some((Command::ShowPrimaryReads, value)) => {
show_response(&mut self.write, "primary reads", &value).await?;
continue;
}
};
if self.handle_custom_protocol(&mut query_router, current_shard, &message, &pool).await? {
continue;
}

debug!("Waiting for connection from pool");
if !self.admin {
Expand Down Expand Up @@ -1606,6 +1542,84 @@ where
}
}

/// Handles custom protocol messages
/// Returns true if the message is custom protocol message, false otherwise
/// Does not work with prepared statements, only simple and extended protocol without parameters
async fn handle_custom_protocol(
&mut self,
query_router: &mut QueryRouter,
current_shard: Option<usize>,
message: &BytesMut,
pool: &ConnectionPool,
) -> Result<bool, Error> {
match query_router.try_execute_command(message) {
None => Ok(false),

Some(custom) => {
match custom {
// SET SHARD TO
(Command::SetShard, _) => {
match query_router.shard() {
None => {}
Some(selected_shard) => {
if selected_shard >= pool.shards() {
// Bad shard number, send error message to client.
query_router.set_shard(current_shard);

error_response(
&mut self.write,
&format!(
"shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)",
selected_shard,
pool.shards(),
current_shard,
),
)
.await?;
} else {
custom_protocol_response_ok(&mut self.write, "SET SHARD")
.await?;
}
}
}
}

// SET PRIMARY READS TO
(Command::SetPrimaryReads, _) => {
custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?;
}

// SET SHARDING KEY TO
(Command::SetShardingKey, _) => {
custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?;
}

// SET SERVER ROLE TO
(Command::SetServerRole, _) => {
custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?;
}

// SHOW SERVER ROLE
(Command::ShowServerRole, value) => {
show_response(&mut self.write, "server role", &value).await?;
}

// SHOW SHARD
(Command::ShowShard, value) => {
show_response(&mut self.write, "shard", &value).await?;
}

// SHOW PRIMARY READS
(Command::ShowPrimaryReads, value) => {
show_response(&mut self.write, "primary reads", &value).await?;
}
};

Ok(true)
}
}
}

/// Register and rewrite the parse statement to the clients statement cache
/// and also the pool's statement cache
fn rewrite_parse(
Expand Down