Skip to content

Commit 54d0c69

Browse files
authored
Add Consistent Hashing Load Balancing Strategy (gatewayd-io#592)
* Added ConsistentHash * changed net conn into Iconnwrapper * added test cases for onsistentHash * fixed lint issues * replace RWMutex to Mutex and added test case for concurency access * added github.com/spaolacci/murmur3 into depguard
1 parent bfb0a81 commit 54d0c69

17 files changed

+385
-53
lines changed

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ linters-settings:
6565
- "golang.org/x/text/cases"
6666
- "golang.org/x/text/language"
6767
- "github.com/redis/go-redis/v9"
68+
- "github.com/spaolacci/murmur3"
6869
test:
6970
files:
7071
- $test

cmd/run.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -910,16 +910,17 @@ var runCmd = &cobra.Command{
910910
// Can be used to send keepalive messages to the client.
911911
EnableTicker: cfg.EnableTicker,
912912
},
913-
Proxies: serverProxies,
914-
Logger: logger,
915-
PluginRegistry: pluginRegistry,
916-
PluginTimeout: conf.Plugin.Timeout,
917-
EnableTLS: cfg.EnableTLS,
918-
CertFile: cfg.CertFile,
919-
KeyFile: cfg.KeyFile,
920-
HandshakeTimeout: cfg.HandshakeTimeout,
921-
LoadbalancerStrategyName: cfg.LoadBalancer.Strategy,
922-
LoadbalancerRules: cfg.LoadBalancer.LoadBalancingRules,
913+
Proxies: serverProxies,
914+
Logger: logger,
915+
PluginRegistry: pluginRegistry,
916+
PluginTimeout: conf.Plugin.Timeout,
917+
EnableTLS: cfg.EnableTLS,
918+
CertFile: cfg.CertFile,
919+
KeyFile: cfg.KeyFile,
920+
HandshakeTimeout: cfg.HandshakeTimeout,
921+
LoadbalancerStrategyName: cfg.LoadBalancer.Strategy,
922+
LoadbalancerRules: cfg.LoadBalancer.LoadBalancingRules,
923+
LoadbalancerConsistentHash: cfg.LoadBalancer.ConsistentHash,
923924
},
924925
)
925926

config/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,14 @@ type LoadBalancingRule struct {
106106
Distribution []Distribution `json:"distribution"`
107107
}
108108

109+
type ConsistentHash struct {
110+
UseSourceIP bool `json:"useSourceIp"`
111+
}
112+
109113
type LoadBalancer struct {
110114
Strategy string `json:"strategy"`
111115
LoadBalancingRules []LoadBalancingRule `json:"loadBalancingRules"`
116+
ConsistentHash *ConsistentHash `json:"consistentHash,omitempty"`
112117
}
113118

114119
type Server struct {

gatewayd.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ servers:
8484
loadBalancer:
8585
# Load balancer strategies can be found in config/constants.go
8686
strategy: ROUND_ROBIN # ROUND_ROBIN, RANDOM, WEIGHTED_ROUND_ROBIN
87+
consistentHash:
88+
useSourceIp: true
8789
# Optional configuration for strategies that support rules (e.g., WEIGHTED_ROUND_ROBIN)
8890
# loadBalancingRules:
8991
# - condition: "DEFAULT" # Currently, only the "DEFAULT" condition is supported

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ require (
126126
github.com/shoenig/go-m1cpu v0.1.6 // indirect
127127
github.com/sirupsen/logrus v1.9.3 // indirect
128128
github.com/skeema/knownhosts v1.2.1 // indirect
129+
github.com/spaolacci/murmur3 v1.1.0 // indirect
129130
github.com/spf13/afero v1.11.0 // indirect
130131
github.com/spf13/pflag v1.0.5 // indirect
132+
github.com/stretchr/objx v0.5.2 // indirect
131133
github.com/tetratelabs/wazero v1.7.2 // indirect
132134
github.com/tklauser/go-sysconf v0.3.12 // indirect
133135
github.com/tklauser/numcpus v0.6.1 // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

network/consistenthash.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package network
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
8+
gerr "github.com/gatewayd-io/gatewayd/errors"
9+
"github.com/spaolacci/murmur3"
10+
)
11+
12+
// ConsistentHash implements a load balancing strategy based on consistent hashing.
13+
// It routes client connections to specific proxies by hashing the client's IP address or the full connection address.
14+
type ConsistentHash struct {
15+
originalStrategy LoadBalancerStrategy
16+
useSourceIP bool
17+
hashMap map[uint64]IProxy
18+
mu sync.Mutex
19+
}
20+
21+
// NewConsistentHash creates a new ConsistentHash instance. It requires a server configuration and an original
22+
// load balancing strategy. The consistent hash can use either the source IP or the full connection address
23+
// as the key for hashing.
24+
func NewConsistentHash(server *Server, originalStrategy LoadBalancerStrategy) *ConsistentHash {
25+
return &ConsistentHash{
26+
originalStrategy: originalStrategy,
27+
useSourceIP: server.LoadbalancerConsistentHash.UseSourceIP,
28+
hashMap: make(map[uint64]IProxy),
29+
}
30+
}
31+
32+
// NextProxy selects the appropriate proxy for a given client connection. It first tries to find an existing
33+
// proxy in the hash map based on the hashed key (either the source IP or the full address). If no match is found,
34+
// it falls back to the original load balancing strategy, adds the selected proxy to the hash map, and returns it.
35+
func (ch *ConsistentHash) NextProxy(conn IConnWrapper) (IProxy, *gerr.GatewayDError) {
36+
ch.mu.Lock()
37+
defer ch.mu.Unlock()
38+
39+
var key string
40+
41+
if ch.useSourceIP {
42+
sourceIP, err := extractIPFromConn(conn)
43+
if err != nil {
44+
return nil, gerr.ErrNoProxiesAvailable.Wrap(err)
45+
}
46+
key = sourceIP
47+
} else {
48+
key = conn.LocalAddr().String() // Fallback to use full address as the key if `useSourceIp` is false
49+
}
50+
51+
hash := hashKey(key)
52+
53+
proxy, exists := ch.hashMap[hash]
54+
55+
if exists {
56+
return proxy, nil
57+
}
58+
59+
// If no hash exists, fallback to the original strategy
60+
proxy, err := ch.originalStrategy.NextProxy(conn)
61+
if err != nil {
62+
return nil, gerr.ErrNoProxiesAvailable.Wrap(err)
63+
}
64+
65+
// Add the selected proxy to the hash map for future requests
66+
ch.hashMap[hash] = proxy
67+
68+
return proxy, nil
69+
}
70+
71+
// hashKey hashes a given key using the MurmurHash3 algorithm. It is used to generate consistent hash values
72+
// for IP addresses or connection strings.
73+
func hashKey(key string) uint64 {
74+
return murmur3.Sum64([]byte(key))
75+
}
76+
77+
// extractIPFromConn extracts the IP address from the connection's local address. It splits the address
78+
// into IP and port components and returns the IP part. This is useful for hashing based on the source IP.
79+
func extractIPFromConn(con IConnWrapper) (string, error) {
80+
addr := con.LocalAddr().String()
81+
// addr will be in the format "IP:port"
82+
ip, _, err := net.SplitHostPort(addr)
83+
if err != nil {
84+
return "", fmt.Errorf("failed to split host and port from address %s: %w", addr, err)
85+
}
86+
return ip, nil
87+
}

network/consistenthash_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package network
2+
3+
import (
4+
"net"
5+
"sync"
6+
"testing"
7+
8+
"github.com/gatewayd-io/gatewayd/config"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
// TestNewConsistentHash verifies that a new ConsistentHash instance is properly created.
13+
// It checks that the original load balancing strategy is preserved, that the useSourceIp
14+
// setting is correctly applied, and that the hashMap is initialized.
15+
func TestNewConsistentHash(t *testing.T) {
16+
server := &Server{
17+
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true},
18+
}
19+
originalStrategy := NewRandom(server)
20+
consistentHash := NewConsistentHash(server, originalStrategy)
21+
22+
assert.NotNil(t, consistentHash)
23+
assert.Equal(t, originalStrategy, consistentHash.originalStrategy)
24+
assert.True(t, consistentHash.useSourceIP)
25+
assert.NotNil(t, consistentHash.hashMap)
26+
}
27+
28+
// TestConsistentHashNextProxyUseSourceIpExists ensures that when useSourceIp is enabled,
29+
// and the hashed IP exists in the hashMap, the correct proxy is returned.
30+
// It mocks a connection with a specific IP and verifies the proxy retrieval from the hashMap.
31+
func TestConsistentHashNextProxyUseSourceIpExists(t *testing.T) {
32+
proxies := []IProxy{
33+
MockProxy{name: "proxy1"},
34+
MockProxy{name: "proxy2"},
35+
MockProxy{name: "proxy3"},
36+
}
37+
server := &Server{
38+
Proxies: proxies,
39+
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true},
40+
}
41+
originalStrategy := NewRandom(server)
42+
consistentHash := NewConsistentHash(server, originalStrategy)
43+
mockConn := new(MockConnWrapper)
44+
45+
// Mock LocalAddr to return a specific IP:port format
46+
mockAddr := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234}
47+
mockConn.On("LocalAddr").Return(mockAddr)
48+
49+
key := "192.168.1.1"
50+
hash := hashKey(key)
51+
52+
consistentHash.hashMap[hash] = proxies[2]
53+
54+
proxy, err := consistentHash.NextProxy(mockConn)
55+
assert.Nil(t, err)
56+
assert.Equal(t, proxies[2], proxy)
57+
58+
// Clean up
59+
mockConn.AssertExpectations(t)
60+
}
61+
62+
// TestConsistentHashNextProxyUseFullAddress verifies the behavior when useSourceIp is disabled.
63+
// It ensures that the full connection address is used for hashing, and the correct proxy is returned
64+
// and cached in the hashMap. The test also checks that the hash value is computed based on the full address.
65+
func TestConsistentHashNextProxyUseFullAddress(t *testing.T) {
66+
mockConn := new(MockConnWrapper)
67+
proxies := []IProxy{
68+
MockProxy{name: "proxy1"},
69+
MockProxy{name: "proxy2"},
70+
MockProxy{name: "proxy3"},
71+
}
72+
server := &Server{
73+
Proxies: proxies,
74+
LoadbalancerConsistentHash: &config.ConsistentHash{
75+
UseSourceIP: false,
76+
},
77+
}
78+
mockStrategy := NewRoundRobin(server)
79+
80+
// Mock LocalAddr to return full address
81+
mockAddr := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234}
82+
mockConn.On("LocalAddr").Return(mockAddr)
83+
84+
consistentHash := NewConsistentHash(server, mockStrategy)
85+
86+
proxy, err := consistentHash.NextProxy(mockConn)
87+
assert.Nil(t, err)
88+
assert.NotNil(t, proxy)
89+
assert.Equal(t, proxies[1], proxy)
90+
91+
// Hash should be calculated using the full address and cached in hashMap
92+
hash := hashKey("192.168.1.1:1234")
93+
cachedProxy, exists := consistentHash.hashMap[hash]
94+
95+
assert.True(t, exists)
96+
assert.Equal(t, proxies[1], cachedProxy)
97+
98+
// Clean up
99+
mockConn.AssertExpectations(t)
100+
}
101+
102+
// TestConsistentHashNextProxyConcurrency tests the concurrency safety of the NextProxy method
103+
// in the ConsistentHash struct. It ensures that multiple goroutines can concurrently call
104+
// NextProxy without causing race conditions or inconsistent behavior.
105+
func TestConsistentHashNextProxyConcurrency(t *testing.T) {
106+
// Setup mocks
107+
conn1 := new(MockConnWrapper)
108+
conn2 := new(MockConnWrapper)
109+
proxies := []IProxy{
110+
MockProxy{name: "proxy1"},
111+
MockProxy{name: "proxy2"},
112+
MockProxy{name: "proxy3"},
113+
}
114+
server := &Server{
115+
Proxies: proxies,
116+
LoadbalancerConsistentHash: &config.ConsistentHash{UseSourceIP: true},
117+
}
118+
originalStrategy := NewRoundRobin(server)
119+
120+
// Mock IP addresses
121+
mockAddr1 := &net.TCPAddr{IP: net.ParseIP("192.168.1.1"), Port: 1234}
122+
mockAddr2 := &net.TCPAddr{IP: net.ParseIP("192.168.1.2"), Port: 1234}
123+
conn1.On("LocalAddr").Return(mockAddr1)
124+
conn2.On("LocalAddr").Return(mockAddr2)
125+
126+
// Initialize the ConsistentHash
127+
consistentHash := NewConsistentHash(server, originalStrategy)
128+
129+
// Run the test concurrently
130+
var waitGroup sync.WaitGroup
131+
const numGoroutines = 100
132+
133+
for range numGoroutines {
134+
waitGroup.Add(1)
135+
go func() {
136+
defer waitGroup.Done()
137+
p, err := consistentHash.NextProxy(conn1)
138+
assert.Nil(t, err)
139+
assert.Equal(t, proxies[1], p)
140+
}()
141+
}
142+
143+
waitGroup.Wait()
144+
145+
// Ensure that the proxy is consistently the same
146+
proxy, err := consistentHash.NextProxy(conn1)
147+
assert.Nil(t, err)
148+
assert.Equal(t, proxies[1], proxy)
149+
150+
// Ensure that connecting from a different address returns a different proxy
151+
proxy, err = consistentHash.NextProxy(conn2)
152+
assert.Nil(t, err)
153+
assert.Equal(t, proxies[2], proxy)
154+
}

network/loadbalancer.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,36 @@ import (
66
)
77

88
type LoadBalancerStrategy interface {
9-
NextProxy() (IProxy, *gerr.GatewayDError)
9+
NextProxy(conn IConnWrapper) (IProxy, *gerr.GatewayDError)
1010
}
1111

1212
// NewLoadBalancerStrategy returns a LoadBalancerStrategy based on the server's load balancer strategy name.
1313
// If the server's load balancer strategy is weighted round-robin,
1414
// it selects a load balancer rule before returning the strategy.
1515
// Returns an error if the strategy is not found or if there are no load balancer rules when required.
1616
func NewLoadBalancerStrategy(server *Server) (LoadBalancerStrategy, *gerr.GatewayDError) {
17+
var strategy LoadBalancerStrategy
1718
switch server.LoadbalancerStrategyName {
1819
case config.RoundRobinStrategy:
19-
return NewRoundRobin(server), nil
20+
strategy = NewRoundRobin(server)
2021
case config.RANDOMStrategy:
21-
return NewRandom(server), nil
22+
strategy = NewRandom(server)
2223
case config.WeightedRoundRobinStrategy:
2324
if server.LoadbalancerRules == nil {
2425
return nil, gerr.ErrNoLoadBalancerRules
2526
}
2627
loadbalancerRule := selectLoadBalancerRule(server.LoadbalancerRules)
27-
return NewWeightedRoundRobin(server, loadbalancerRule), nil
28+
strategy = NewWeightedRoundRobin(server, loadbalancerRule)
2829
default:
2930
return nil, gerr.ErrLoadBalancerStrategyNotFound
3031
}
32+
33+
// If consistent hashing is enabled, wrap the strategy
34+
if server.LoadbalancerConsistentHash != nil {
35+
strategy = NewConsistentHash(server, strategy)
36+
}
37+
38+
return strategy, nil
3139
}
3240

3341
// selectLoadBalancerRule selects and returns the first load balancer rule that matches the default condition.

0 commit comments

Comments
 (0)