Skip to content

Commit 350d1fc

Browse files
committed
[tunnel] Prepare for multi-conn routing and egress NAT
* Refactor Router interface in prep for adding ECMP routing. * Split implementation into client and server flavors - each side will use its own implementation. * Some minor naming refactor.
1 parent 812b765 commit 350d1fc

File tree

12 files changed

+158
-83
lines changed

12 files changed

+158
-83
lines changed

api/core/v1alpha/tunnel_types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,21 @@ type TunnelNode struct {
2525
Status TunnelNodeStatus `json:"status,omitempty"`
2626
}
2727

28+
type EgressGatewaySpec struct {
29+
// Whether the egress gateway is enabled. Default is false.
30+
// When enabled, the egress gateway will be used to route traffic from the tunnel
31+
// node to the internet. Traffic will be SNAT'ed.
32+
// +optional
33+
Enabled bool `json:"enabled,omitempty"`
34+
}
35+
2836
type TunnelNodeSpec struct {
37+
// Configures Egress Gateway mode on the Tunnel Node. In this mode, the Tunnel
38+
// Node acts as a gateway for outbound connections originating from the
39+
// Agent side in addition to its default mode (where the connections arrive in the
40+
// direction of the Agent).
41+
// +optional
42+
EgressGateway *EgressGatewaySpec `json:"egressGateway,omitempty"`
2943
}
3044

3145
type AgentStatus struct {

pkg/tunnel/client.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,10 @@ func (c *TunnelClient) Start(ctx context.Context) error {
278278
}
279279

280280
if c.options.mode == TunnelClientModeKernel {
281-
c.router, err = router.NewNetlinkRouter(routerOpts...)
282-
if err != nil {
283-
return fmt.Errorf("failed to create kernel router: %w", err)
284-
}
281+
//c.router, err = router.NewNetlinkRouter(routerOpts...)
282+
//if err != nil {
283+
//return fmt.Errorf("failed to create kernel router: %w", err)
284+
//}
285285
} else if c.options.mode == TunnelClientModeUser {
286286
c.router, err = router.NewNetstackRouter(routerOpts...)
287287
if err != nil {
@@ -298,9 +298,8 @@ func (c *TunnelClient) Start(ctx context.Context) error {
298298
for _, prefix := range route.Prefixes() {
299299
slog.Info("Adding route", slog.String("prefix", prefix.String()))
300300

301-
_, _, err := c.router.AddPeer(prefix, c.conn)
302-
if err != nil {
303-
return fmt.Errorf("failed to add peer route %s: %w", prefix.String(), err)
301+
if err := c.router.Add(prefix, c.conn); err != nil {
302+
return fmt.Errorf("failed to add route %s: %w", prefix.String(), err)
304303
}
305304
}
306305
}

pkg/tunnel/connection/conn.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package connection
2+
3+
import "io"
4+
5+
// Connection is a simple interface implemented by connect-ip-go and custom
6+
// connection types.
7+
type Connection interface {
8+
io.Closer
9+
10+
ReadPacket([]byte) (int, error)
11+
WritePacket([]byte) ([]byte, error)
12+
}

pkg/tunnel/connection/muxed_connection.go renamed to pkg/tunnel/connection/muxed_conn.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package connection
33
import (
44
"errors"
55
"fmt"
6-
"io"
76
"log/slog"
87
"net"
98
"net/netip"
@@ -15,19 +14,8 @@ import (
1514
"github.com/apoxy-dev/apoxy/pkg/netstack"
1615
)
1716

18-
// Connection is a simple interface implemented by connect-ip-go and custom
19-
// connection types.
20-
type Connection interface {
21-
io.Closer
22-
ReadPacket([]byte) (int, error)
23-
WritePacket([]byte) ([]byte, error)
24-
}
25-
26-
var _ Connection = (*MuxedConnection)(nil)
27-
28-
// MuxedConnection is a connection that multiplexes multiple downstream
29-
// connections over a single virtual connection.
30-
type MuxedConnection struct {
17+
// MuxedConn multiplexes multiple connection.Conn objects.
18+
type MuxedConn struct {
3119
// Maps tunnel destination address to CONNECT-IP connection.
3220
conns *triemap.TrieMap[Connection]
3321
incomingPackets chan *[]byte
@@ -37,8 +25,9 @@ type MuxedConnection struct {
3725
closed atomic.Bool
3826
}
3927

40-
func NewMuxedConnection() *MuxedConnection {
41-
return &MuxedConnection{
28+
// NewMuxedConn creates a new muxedConn.
29+
func NewMuxedConn() *MuxedConn {
30+
return &MuxedConn{
4231
conns: triemap.New[Connection](),
4332
incomingPackets: make(chan *[]byte, 100),
4433
packetBufferPool: sync.Pool{
@@ -50,7 +39,7 @@ func NewMuxedConnection() *MuxedConnection {
5039
}
5140
}
5241

53-
func (m *MuxedConnection) AddConnection(prefix netip.Prefix, conn Connection) {
42+
func (m *MuxedConn) AddConnection(prefix netip.Prefix, conn Connection) {
5443
if prefix.IsValid() && prefix.Addr().Is6() {
5544
m.conns.Insert(prefix, conn)
5645
go m.readPackets(conn)
@@ -59,7 +48,7 @@ func (m *MuxedConnection) AddConnection(prefix netip.Prefix, conn Connection) {
5948
}
6049
}
6150

62-
func (m *MuxedConnection) RemoveConnection(prefix netip.Prefix) error {
51+
func (m *MuxedConn) RemoveConnection(prefix netip.Prefix) error {
6352
// Has the connection already been closed?
6453
if m.closed.Load() {
6554
// Then this becomes a no-op.
@@ -85,7 +74,7 @@ func (m *MuxedConnection) RemoveConnection(prefix netip.Prefix) error {
8574
return nil
8675
}
8776

88-
func (m *MuxedConnection) Prefixes() []netip.Prefix {
77+
func (m *MuxedConn) Prefixes() []netip.Prefix {
8978
var prefixes []netip.Prefix
9079
m.conns.ForEach(func(prefix netip.Prefix, value Connection) bool {
9180
prefixes = append(prefixes, prefix)
@@ -94,7 +83,7 @@ func (m *MuxedConnection) Prefixes() []netip.Prefix {
9483
return prefixes
9584
}
9685

97-
func (m *MuxedConnection) Close() error {
86+
func (m *MuxedConn) Close() error {
9887
var firstErr error
9988
m.closeOnce.Do(func() {
10089
// Close all connections in the map.
@@ -122,7 +111,7 @@ func (m *MuxedConnection) Close() error {
122111
return firstErr
123112
}
124113

125-
func (m *MuxedConnection) ReadPacket(pkt []byte) (int, error) {
114+
func (m *MuxedConn) ReadPacket(pkt []byte) (int, error) {
126115
p, ok := <-m.incomingPackets
127116
if !ok {
128117
return 0, net.ErrClosed
@@ -138,7 +127,7 @@ func (m *MuxedConnection) ReadPacket(pkt []byte) (int, error) {
138127
return n, nil
139128
}
140129

141-
func (m *MuxedConnection) WritePacket(pkt []byte) ([]byte, error) {
130+
func (m *MuxedConn) WritePacket(pkt []byte) ([]byte, error) {
142131
slog.Debug("Write packet to connection", slog.Int("bytes", len(pkt)))
143132

144133
var dstIP netip.Addr
@@ -170,7 +159,7 @@ func (m *MuxedConnection) WritePacket(pkt []byte) ([]byte, error) {
170159
return conn.WritePacket(pkt)
171160
}
172161

173-
func (m *MuxedConnection) readPackets(conn Connection) {
162+
func (m *MuxedConn) readPackets(conn Connection) {
174163
for {
175164
pkt := m.packetBufferPool.Get().(*[]byte)
176165

pkg/tunnel/connection/muxed_connection_test.go renamed to pkg/tunnel/connection/muxed_conn_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (m *MockConnection) Close() error {
4040

4141
func TestMuxedConnection(t *testing.T) {
4242
t.Run("Add and Remove Connection", func(t *testing.T) {
43-
mux := connection.NewMuxedConnection()
43+
mux := connection.NewMuxedConn()
4444
mockConn := new(MockConnection)
4545
mockConn.On("ReadPacket", mock.Anything).Return(0, []byte{}, nil).Maybe()
4646
mockConn.On("Close").Return(nil).Once()
@@ -56,14 +56,14 @@ func TestMuxedConnection(t *testing.T) {
5656
})
5757

5858
t.Run("Remove Connection - Invalid Prefix", func(t *testing.T) {
59-
mux := connection.NewMuxedConnection()
59+
mux := connection.NewMuxedConn()
6060
prefix := netip.MustParsePrefix("192.0.2.0/24")
6161
err := mux.RemoveConnection(prefix)
6262
assert.Error(t, err)
6363
})
6464

6565
t.Run("WritePacket - Success", func(t *testing.T) {
66-
mux := connection.NewMuxedConnection()
66+
mux := connection.NewMuxedConn()
6767
mockConn := new(MockConnection)
6868
mockConn.On("ReadPacket", mock.Anything).Return(0, []byte{}, nil).Maybe()
6969

@@ -83,7 +83,7 @@ func TestMuxedConnection(t *testing.T) {
8383
})
8484

8585
t.Run("WritePacket - No Connection Found", func(t *testing.T) {
86-
mux := connection.NewMuxedConnection()
86+
mux := connection.NewMuxedConn()
8787

8888
pkt := make([]byte, 40)
8989
pkt[0] = 0x60
@@ -95,7 +95,7 @@ func TestMuxedConnection(t *testing.T) {
9595
})
9696

9797
t.Run("ReadPacket - Success", func(t *testing.T) {
98-
mux := connection.NewMuxedConnection()
98+
mux := connection.NewMuxedConn()
9999
mockConn := new(MockConnection)
100100

101101
expected := []byte("hello")
@@ -115,7 +115,7 @@ func TestMuxedConnection(t *testing.T) {
115115
})
116116

117117
t.Run("ReadPacket - Closed Channel", func(t *testing.T) {
118-
mux := connection.NewMuxedConnection()
118+
mux := connection.NewMuxedConn()
119119
_ = mux.Close()
120120

121121
buf := make([]byte, 1500)
@@ -125,7 +125,7 @@ func TestMuxedConnection(t *testing.T) {
125125
})
126126

127127
t.Run("Close - All Connections", func(t *testing.T) {
128-
mux := connection.NewMuxedConnection()
128+
mux := connection.NewMuxedConn()
129129
mockConn := new(MockConnection)
130130
mockConn.On("ReadPacket", mock.Anything).Return(0, []byte{}, nil).Maybe()
131131
mockConn.On("Close").Return(nil).Once()

pkg/tunnel/router/netstack.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ var (
2222
)
2323

2424
// NetstackRouter implements Router using a userspace network stack.
25+
// This router can be used for both client and server sides.
2526
type NetstackRouter struct {
2627
tunDev *netstack.TunDevice
27-
mux *connection.MuxedConnection
28+
mux *connection.MuxedConn
2829
proxy *socksproxy.ProxyServer
2930
localAddresses []netip.Prefix
3031
resolveConf *network.ResolveConfig
@@ -48,7 +49,7 @@ func NewNetstackRouter(opts ...Option) (*NetstackRouter, error) {
4849

4950
return &NetstackRouter{
5051
tunDev: tunDev,
51-
mux: connection.NewMuxedConnection(),
52+
mux: connection.NewMuxedConn(),
5253
proxy: proxy,
5354
localAddresses: options.localAddresses,
5455
resolveConf: options.resolveConf,
@@ -105,23 +106,42 @@ func (r *NetstackRouter) Start(ctx context.Context) error {
105106
return g.Wait()
106107
}
107108

108-
// AddPeer adds a peer route to the tunnel.
109-
func (r *NetstackRouter) AddPeer(peer netip.Prefix, conn connection.Connection) (netip.Addr, []netip.Prefix, error) {
110-
r.mux.AddConnection(peer, conn)
111-
return peer.Addr(), r.localAddresses, nil
109+
// Add adds a dst route to the tunnel.
110+
func (r *NetstackRouter) Add(dst netip.Prefix, conn connection.Connection) error {
111+
r.mux.AddConnection(dst, conn)
112+
return nil
113+
}
114+
115+
// Del removes a dst route from the tunnel.
116+
func (r *NetstackRouter) Del(dst netip.Prefix, _ string) error {
117+
return r.DelAll(dst) // TODO: implement multi-conn routes.
112118
}
113119

114-
// RemovePeer removes a peer route from the tunnel.
115-
func (r *NetstackRouter) RemovePeer(peer netip.Prefix) error {
116-
if err := r.mux.RemoveConnection(peer); err != nil {
120+
// DelAll removes all routes for the dst.
121+
func (r *NetstackRouter) DelAll(dst netip.Prefix) error {
122+
if err := r.mux.RemoveConnection(dst); err != nil {
117123
slog.Error("failed to remove connection", slog.Any("error", err))
118124
}
119125

120126
return nil
121127
}
122128

129+
// ListRoutes returns a list of all routes in the tunnel.
130+
func (r *NetstackRouter) ListRoutes() ([]TunnelRoute, error) {
131+
ps := r.mux.Prefixes()
132+
rts := make([]TunnelRoute, 0, len(ps))
133+
for _, p := range ps {
134+
rts = append(rts, TunnelRoute{
135+
Dst: p,
136+
// TODO: Add connID,
137+
State: TunnelRouteStateActive,
138+
})
139+
}
140+
return rts, nil
141+
}
142+
123143
// GetMuxedConnection returns the muxed connection for adding/removing connections.
124-
func (r *NetstackRouter) GetMuxedConnection() *connection.MuxedConnection {
144+
func (r *NetstackRouter) GetMuxedConnection() *connection.MuxedConn {
125145
return r.mux
126146
}
127147

pkg/tunnel/router/netstack_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,11 @@ func TestNetstackRouter(t *testing.T) {
3636

3737
// Test AddPeer
3838
prefix := netip.MustParsePrefix("fd00::1/128")
39-
conn := connection.NewMuxedConnection()
40-
_, _, err = r.AddPeer(prefix, conn)
41-
require.NoError(t, err)
39+
conn := connection.NewMuxedConn()
40+
require.NoError(t, r.Add(prefix, conn))
4241

4342
// Test RemovePeer
44-
err = r.RemovePeer(prefix)
43+
err = r.DelAll(prefix)
4544
require.NoError(t, err)
4645

4746
// Test Close

pkg/tunnel/router/router.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,19 @@ import (
88
"github.com/apoxy-dev/apoxy/pkg/tunnel/connection"
99
)
1010

11+
type TunnelRouteState int
12+
13+
const (
14+
TunnelRouteStateActive TunnelRouteState = iota
15+
TunnelRouteStateDraining
16+
)
17+
18+
type TunnelRoute struct {
19+
Dst netip.Prefix
20+
TunID string
21+
State TunnelRouteState
22+
}
23+
1124
// Router is an interface for managing tunnel routing.
1225
type Router interface {
1326
io.Closer
@@ -16,12 +29,24 @@ type Router interface {
1629
// It's a blocking call that should be run in a separate goroutine.
1730
Start(ctx context.Context) error
1831

19-
// AddPeer adds a peer route to the tunnel. Returns the list of IP prefixes
20-
// to be advertised to the peer (if none returned, no prefixes should be advertised).
21-
AddPeer(peer netip.Prefix, conn connection.Connection) (privAddr netip.Addr, routes []netip.Prefix, err error)
32+
// AddRoute adds a dst prefix to be routed through the given tunnel connection.
33+
// If multiple tunnels are provided, the router will distribute traffic across them
34+
// using ECMP (Equal Cost Multi-Path Routing) hashing.
35+
Add(dst netip.Prefix, tun connection.Connection) error
36+
37+
// Del removes a routing associations for a given destination prefix and Connection name.
38+
// New matching flows will stop being routed through the tunnel immediately while
39+
// existing flows may continue to use the tunnel for some draining period before
40+
// getting re-routed via a different tunnel or dropped (if no tunnel is available for
41+
// the given dst).
42+
Del(dst netip.Prefix, name string) error
43+
44+
// DelAll removes all routing associations for a given destination prefix.
45+
// Connections will be drained same way as Del.
46+
DelAll(dst netip.Prefix) error
2247

23-
// RemovePeer removes a peer route from the tunnel identified by the given prefix.
24-
RemovePeer(peer netip.Prefix) error
48+
// ListRoutes returns a list of all routes currently managed by the router.
49+
ListRoutes() ([]TunnelRoute, error)
2550

2651
// LocalAddresses returns the list of local addresses that are assigned to the router.
2752
LocalAddresses() ([]netip.Prefix, error)

0 commit comments

Comments
 (0)