Skip to content
3 changes: 2 additions & 1 deletion cspell.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"Millis",
"paho",
"jsoniter",
"subresource"
"subresource",
"streamingkey"
]
}
16 changes: 12 additions & 4 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func (c *client) GetTopic(reqPath string) (*Topic, bool) {
}

func (c *client) Subscribe(reqPath string) *Topic {
// Check if there's already a topic with this exact key (reqPath)
if existingTopic, ok := c.topics.Load(reqPath); ok {
return existingTopic
}

chunks := strings.Split(reqPath, "/")
if len(chunks) < 2 {
log.DefaultLogger.Error("Invalid path", "path", reqPath)
Expand All @@ -129,14 +134,16 @@ func (c *client) Subscribe(reqPath string) *Topic {
return nil
}

// For MQTT subscription, we only need the actual topic path (without streaming key)
// The streaming key is used for topic uniqueness in storage, but MQTT only cares about the topic path
topicPath := path.Join(chunks[1:]...)

// Create topic with the reqPath as the key for storage
// The actual topic components will be parsed when needed
t := &Topic{
Path: topicPath,
Interval: interval,
}
if t, ok := c.topics.Load(topicPath); ok {
return t
}

topic, err := decodeTopic(t.Path)
if err != nil {
Expand All @@ -153,7 +160,8 @@ func (c *client) Subscribe(reqPath string) *Topic {
}); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
}
c.topics.Store(t)
// Store the topic using reqPath as the key (which includes streaming key)
c.topics.Map.Store(reqPath, t)
return t
}

Expand Down
281 changes: 281 additions & 0 deletions pkg/mqtt/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package mqtt

import (
"path"
"strings"
"testing"
"time"
)

// Mock client that implements our Client interface directly
type mockClient struct {
topics TopicMap
subscriptions map[string]bool
connected bool
}

func (m *mockClient) GetTopic(reqPath string) (*Topic, bool) {
return m.topics.Load(reqPath)
}

func (m *mockClient) IsConnected() bool {
return m.connected
}

func (m *mockClient) Subscribe(reqPath string) *Topic {
// Check if already exists
if existingTopic, ok := m.topics.Load(reqPath); ok {
return existingTopic
}

chunks := strings.Split(reqPath, "/")
if len(chunks) < 2 {
return nil
}
interval, err := time.ParseDuration(chunks[0])
if err != nil {
return nil
}

topicPath := path.Join(chunks[1:]...)
t := &Topic{
Path: topicPath,
Interval: interval,
Messages: []Message{},
}

// Track MQTT subscription (simplified for testing)
if m.subscriptions == nil {
m.subscriptions = make(map[string]bool)
}
// For testing, assume we decode the topic properly
m.subscriptions["test/topic"] = true

// Store with reqPath as key
m.topics.Map.Store(reqPath, t)
return t
}

func (m *mockClient) Unsubscribe(reqPath string) {
m.topics.Delete(reqPath)
}

func (m *mockClient) Dispose() {
// Clear all topics and subscriptions
m.topics = TopicMap{}
m.subscriptions = make(map[string]bool)
}

func (m *mockClient) HandleMessage(topicPath string, payload []byte) {
message := Message{
Timestamp: time.Now(),
Value: payload,
}
m.topics.AddMessage(topicPath, message)
}

func newMockClient() *mockClient {
return &mockClient{
topics: TopicMap{},
subscriptions: make(map[string]bool),
connected: true,
}
}

func TestClient_Subscribe_WithStreamingKey(t *testing.T) {
c := newMockClient()

tests := []struct {
name string
reqPath string
expectTopic bool
expectedPath string
}{
{
name: "subscribe with streaming key",
reqPath: "1s/dGVzdC90b3BpYw/user1/hash123/org456",
expectTopic: true,
expectedPath: "dGVzdC90b3BpYw/user1/hash123/org456",
},
{
name: "subscribe without streaming key",
reqPath: "5s/dGVzdC90b3BpYw",
expectTopic: true,
expectedPath: "dGVzdC90b3BpYw",
},
{
name: "invalid path - no interval",
reqPath: "invalid",
expectTopic: false,
},
{
name: "invalid interval",
reqPath: "invalid-interval/dGVzdC90b3BpYw",
expectTopic: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topic := c.Subscribe(tt.reqPath)

if tt.expectTopic {
if topic == nil {
t.Errorf("Expected topic to be created, but got nil")
return
}

// Verify topic is stored with the correct key
storedTopic, found := c.topics.Load(tt.reqPath)
if !found {
t.Errorf("Expected topic to be stored with key %s", tt.reqPath)
}

if storedTopic.Path != tt.expectedPath {
t.Errorf("Expected topic path %s, got %s", tt.expectedPath, storedTopic.Path)
}
} else {
if topic != nil {
t.Errorf("Expected nil topic for invalid input, but got %v", topic)
}
}
})
}
}

func TestClient_Subscribe_Deduplication(t *testing.T) {
c := newMockClient()

reqPath := "1s/dGVzdC90b3BpYw/user1/hash123/org456"

// Subscribe first time
topic1 := c.Subscribe(reqPath)
if topic1 == nil {
t.Fatal("Expected topic to be created")
}

// Subscribe second time - should return same topic
topic2 := c.Subscribe(reqPath)
if topic2 == nil {
t.Fatal("Expected topic to be returned")
}

if topic1 != topic2 {
t.Error("Expected same topic instance for duplicate subscription")
}

// Verify only one topic is stored
count := 0
c.topics.Map.Range(func(key, value interface{}) bool {
count++
return true
})

if count != 1 {
t.Errorf("Expected 1 stored topic, got %d", count)
}
}

func TestClient_Subscribe_MultipleStreamingKeys(t *testing.T) {
c := newMockClient()

// Same MQTT topic, same interval, different streaming keys
reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"
reqPath3 := "1s/dGVzdC90b3BpYw/user1/hash123/org789"

topic1 := c.Subscribe(reqPath1)
topic2 := c.Subscribe(reqPath2)
topic3 := c.Subscribe(reqPath3)

if topic1 == nil || topic2 == nil || topic3 == nil {
t.Fatal("Expected all topics to be created")
}

// All should be different instances
if topic1 == topic2 || topic1 == topic3 || topic2 == topic3 {
t.Error("Expected different topic instances for different streaming keys")
}

// Verify all three topics are stored separately
count := 0
c.topics.Map.Range(func(key, value interface{}) bool {
count++
return true
})

if count != 3 {
t.Errorf("Expected 3 stored topics, got %d", count)
}

// Verify each can be retrieved by its specific key
storedTopic1, found1 := c.GetTopic(reqPath1)
storedTopic2, found2 := c.GetTopic(reqPath2)
storedTopic3, found3 := c.GetTopic(reqPath3)

if !found1 || !found2 || !found3 {
t.Error("Expected all topics to be retrievable by their keys")
}

if storedTopic1 == storedTopic2 || storedTopic1 == storedTopic3 || storedTopic2 == storedTopic3 {
t.Error("Expected retrieved topics to be different instances")
}
}

func TestClient_GetTopic(t *testing.T) {
c := newMockClient()

reqPath := "2s/dGVzdC90b3BpYw/streaming/key/123"

// Topic doesn't exist yet
_, found := c.GetTopic(reqPath)
if found {
t.Error("Expected topic not to be found initially")
}

// Create topic
topic := c.Subscribe(reqPath)
if topic == nil {
t.Fatal("Expected topic to be created")
}

// Now it should be found
retrievedTopic, found := c.GetTopic(reqPath)
if !found {
t.Error("Expected topic to be found after subscription")
}

if retrievedTopic != topic {
t.Error("Expected retrieved topic to be the same instance")
}
}

func TestClient_MessageHandling_WithStreamingKeys(t *testing.T) {
c := newMockClient()

// Create topics with same MQTT path but different streaming keys
reqPath1 := "1s/dGVzdC90b3BpYw/user1/hash123/org456"
reqPath2 := "1s/dGVzdC90b3BpYw/user2/hash456/org456"

topic1 := c.Subscribe(reqPath1)
topic2 := c.Subscribe(reqPath2)

if topic1 == nil || topic2 == nil {
t.Fatal("Expected both topics to be created")
}

// Simulate MQTT message arrival
mqttTopicPath := "dGVzdC90b3BpYw/user1/hash123/org456" // This is what HandleMessage receives
c.HandleMessage(mqttTopicPath, []byte("test message"))

// Check that only the matching topic received the message
updatedTopic1, _ := c.GetTopic(reqPath1)
updatedTopic2, _ := c.GetTopic(reqPath2)

if len(updatedTopic1.Messages) != 1 {
t.Errorf("Expected 1 message in topic1, got %d", len(updatedTopic1.Messages))
}
if len(updatedTopic2.Messages) != 0 {
t.Errorf("Expected 0 messages in topic2, got %d", len(updatedTopic2.Messages))
}
}
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/array.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: | Labels: |
// | Type: []time.Time | Type: []json.RawMessage |
// +-------------------------------+-------------------------+
// | 1969-12-31 19:00:00 -0500 EST | [1,2,3] |
// | 1970-01-01 02:00:00 +0200 EET | [1,2,3] |
// +-------------------------------+-------------------------+
//
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/bool.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*bool |
// +-------------------------------+---------------+
// | 1969-12-31 19:00:00 -0500 EST | true |
// | 1970-01-01 02:00:00 +0200 EET | true |
// +-------------------------------+---------------+
//
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/float.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 1969-12-31 19:00:00 -0500 EST | 123.456 |
// | 1970-01-01 02:00:00 +0200 EET | 123.456 |
// +-------------------------------+------------------+
//
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/int.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 1969-12-31 19:00:00 -0500 EST | 123 |
// | 1970-01-01 02:00:00 +0200 EET | 123 |
// +-------------------------------+------------------+
//
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/nested-object.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 | Type: []json.RawMessage |
// +-------------------------------+------------------+-------------------------+
// | 1969-12-31 19:00:00 -0500 EST | 1 | {"c":[1,2,3]} |
// | 1970-01-01 02:00:00 +0200 EET | 1 | {"c":[1,2,3]} |
// +-------------------------------+------------------+-------------------------+
//
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/testdata/null.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// | Labels: |
// | Type: []time.Time |
// +-------------------------------+
// | 1969-12-31 19:00:00 -0500 EST |
// | 1970-01-01 02:00:00 +0200 EET |
// +-------------------------------+
//
//
Expand Down
6 changes: 3 additions & 3 deletions pkg/mqtt/testdata/object-changing-type.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
// | Labels: | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 | Type: []*float64 |
// +-------------------------------+------------------+------------------+
// | 1969-12-31 19:00:00 -0500 EST | 1 | 2 |
// | 1969-12-31 19:01:00 -0500 EST | null | null |
// | 1969-12-31 19:02:00 -0500 EST | 3 | 4 |
// | 1970-01-01 02:00:00 +0200 EET | 1 | 2 |
// | 1970-01-01 02:01:00 +0200 EET | null | null |
// | 1970-01-01 02:02:00 +0200 EET | 3 | 4 |
// +-------------------------------+------------------+------------------+
//
//
Expand Down
Loading
Loading