Skip to content

Commit 0fc60ce

Browse files
authored
ZMQ publisher (llm-d#119)
* ZMQ publisher Signed-off-by: Ira <IRAR@il.ibm.com> * Fix lint errors Signed-off-by: Ira <IRAR@il.ibm.com> * Fix lint errors Signed-off-by: Ira <IRAR@il.ibm.com> * Use sleep instead of ticker Signed-off-by: Ira <IRAR@il.ibm.com> --------- Signed-off-by: Ira <IRAR@il.ibm.com>
1 parent f96e9e6 commit 0fc60ce

File tree

8 files changed

+272
-2
lines changed

8 files changed

+272
-2
lines changed

.github/workflows/ci-pr-checks.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ jobs:
2121
go-version: '1.24.0'
2222
cache-dependency-path: ./go.sum
2323

24+
- name: Install libzmq dependencies (kvcache/kvevents)
25+
run: |
26+
sudo apt-get update
27+
sudo apt-get install -y libzmq3-dev pkg-config
28+
29+
- name: Set PKG_CONFIG_PATH
30+
run: echo "PKG_CONFIG_PATH=/usr/lib/pkgconfig" >> $GITHUB_ENV
31+
2432
- name: Run lint checks
2533
uses: golangci/golangci-lint-action@v8
2634
with:

Dockerfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ RUN go build -a -o bin/llm-d-inference-sim -ldflags="-extldflags '-L$(pwd)/lib'"
4141
FROM registry.access.redhat.com/ubi9/ubi-minimal:latest
4242

4343
WORKDIR /
44+
45+
# Install zeromq runtime library needed by the manager.
46+
# The final image is UBI9, so we need epel-release-9.
47+
USER root
48+
RUN microdnf install -y dnf && \
49+
dnf install -y 'https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm' && \
50+
dnf install -y zeromq
51+
4452
COPY --from=builder /workspace/bin/llm-d-inference-sim /app/llm-d-inference-sim
4553

4654
# USER 65532:65532

Makefile

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ format: ## Format Go source files
6363
@gofmt -l -w $(SRC)
6464

6565
.PHONY: test
66-
test: check-ginkgo download-tokenizer ## Run tests
66+
test: check-ginkgo download-tokenizer download-zmq ## Run tests
6767
@printf "\033[33;1m==== Running tests ====\033[0m\n"
6868
CGO_ENABLED=1 ginkgo -ldflags="$(LDFLAGS)" -v -r
6969

@@ -80,7 +80,7 @@ lint: check-golangci-lint ## Run lint
8080
##@ Build
8181

8282
.PHONY: build
83-
build: check-go download-tokenizer ##
83+
build: check-go download-tokenizer download-zmq
8484
@printf "\033[33;1m==== Building ====\033[0m\n"
8585
go build -ldflags="$(LDFLAGS)" -o bin/$(PROJECT_NAME) cmd/$(PROJECT_NAME)/main.go
8686

@@ -193,3 +193,35 @@ print-project-name: ## Print the current project name
193193
.PHONY: install-hooks
194194
install-hooks: ## Install git hooks
195195
git config core.hooksPath hooks
196+
197+
##@ ZMQ Setup
198+
199+
.PHONY: download-zmq
200+
download-zmq: ## Install ZMQ dependencies based on OS/ARCH
201+
@echo "Checking if ZMQ is already installed..."
202+
@if pkg-config --exists libzmq; then \
203+
echo "✅ ZMQ is already installed."; \
204+
else \
205+
echo "Installing ZMQ dependencies..."; \
206+
if [ "$(TARGETOS)" = "linux" ]; then \
207+
if [ -x "$(command -v apt)" ]; then \
208+
apt update && apt install -y libzmq3-dev; \
209+
elif [ -x "$(command -v dnf)" ]; then \
210+
dnf install -y zeromq-devel; \
211+
else \
212+
echo "Unsupported Linux package manager. Install libzmq manually."; \
213+
exit 1; \
214+
fi; \
215+
elif [ "$(TARGETOS)" = "darwin" ]; then \
216+
if [ -x "$(command -v brew)" ]; then \
217+
brew install zeromq; \
218+
else \
219+
echo "Homebrew is not installed and is required to install zeromq. Install it from https://brew.sh/"; \
220+
exit 1; \
221+
fi; \
222+
else \
223+
echo "Unsupported OS: $(TARGETOS). Install libzmq manually - check https://zeromq.org/download/ for guidance."; \
224+
exit 1; \
225+
fi; \
226+
echo "✅ ZMQ dependencies installed."; \
227+
fi

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ require (
1212
github.com/onsi/ginkgo/v2 v2.23.4
1313
github.com/onsi/gomega v1.37.0
1414
github.com/openai/openai-go v0.1.0-beta.10
15+
github.com/pebbe/zmq4 v1.4.0
1516
github.com/prometheus/client_golang v1.22.0
1617
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
1718
github.com/spf13/pflag v1.0.6
1819
github.com/valyala/fasthttp v1.59.0
20+
github.com/vmihailenco/msgpack v4.0.4+incompatible
1921
gopkg.in/yaml.v3 v3.0.1
2022
k8s.io/klog/v2 v2.130.1
2123
)
@@ -34,6 +36,7 @@ require (
3436
github.com/go-openapi/swag v0.23.0 // indirect
3537
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
3638
github.com/gogo/protobuf v1.3.2 // indirect
39+
github.com/golang/protobuf v1.5.2 // indirect
3740
github.com/google/gnostic-models v0.6.9 // indirect
3841
github.com/google/go-cmp v0.7.0 // indirect
3942
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
@@ -64,6 +67,7 @@ require (
6467
golang.org/x/text v0.23.0 // indirect
6568
golang.org/x/time v0.9.0 // indirect
6669
golang.org/x/tools v0.31.0 // indirect
70+
google.golang.org/appengine v1.6.8 // indirect
6771
google.golang.org/protobuf v1.36.5 // indirect
6872
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
6973
gopkg.in/inf.v0 v0.9.1 // indirect

go.sum

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
4040
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
4141
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
4242
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
43+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
44+
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
45+
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
46+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4347
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
4448
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
4549
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -86,6 +90,8 @@ github.com/openai/openai-go v0.1.0-beta.10/go.mod h1:g461MYGXEXBVdV5SaR/5tNzNbSf
8690
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
8791
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
8892
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
93+
github.com/pebbe/zmq4 v1.4.0 h1:gO5P92Ayl8GXpPZdYcD62Cwbq0slSBVVQRIXwGSJ6eQ=
94+
github.com/pebbe/zmq4 v1.4.0/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
8995
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
9096
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
9197
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
@@ -180,6 +186,49 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
180186
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
181187
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
182188
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
189+
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
190+
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
191+
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
192+
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
193+
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
194+
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
195+
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
196+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
197+
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
198+
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
199+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
200+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
201+
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
202+
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
203+
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
204+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
205+
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
206+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
207+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
208+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
209+
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
210+
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
211+
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
212+
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
213+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
214+
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
215+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
216+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
217+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
218+
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
219+
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
220+
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
221+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
222+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
223+
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
224+
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
225+
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
226+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
227+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
228+
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
229+
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
230+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
231+
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
183232
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
184233
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
185234
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

pkg/common/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ func ParseCommandParamsAndLoadConfig() (*Configuration, error) {
313313
f.IntVar(&config.MinToolCallArrayParamLength, "min-tool-call-array-param-length", config.MinToolCallArrayParamLength, "Minimum possible length of array parameters in a tool call")
314314
f.IntVar(&config.ToolCallNotRequiredParamProbability, "tool-call-not-required-param-probability", config.ToolCallNotRequiredParamProbability, "Probability to add a parameter, that is not required, in a tool call")
315315
f.IntVar(&config.ObjectToolCallNotRequiredParamProbability, "object-tool-call-not-required-field-probability", config.ObjectToolCallNotRequiredParamProbability, "Probability to add a field, that is not required, in an object in a tool call")
316+
f.BoolVar(&config.EnableKVCache, "enable-kvcache", config.EnableKVCache, "Defines if KV cache feature is enabled")
316317

317318
// These values were manually parsed above in getParamValueFromArgs, we leave this in order to get these flags in --help
318319
var dummyString string

pkg/common/publisher.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2025 The llm-d-inference-sim Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import (
20+
"context"
21+
"encoding/binary"
22+
"errors"
23+
"fmt"
24+
"sync/atomic"
25+
26+
zmq "github.com/pebbe/zmq4"
27+
"github.com/vmihailenco/msgpack"
28+
"k8s.io/klog/v2"
29+
)
30+
31+
// Publisher sends events to a ZMQ endpoint.
32+
type Publisher struct {
33+
socket *zmq.Socket
34+
endpoint string
35+
seqNum uint64
36+
}
37+
38+
// NewPublisher creates a new ZMQ publisher.
39+
// endpoint is the ZMQ address to bind to (e.g., "tcp://*:5557").
40+
func NewPublisher(endpoint string) (*Publisher, error) {
41+
socket, err := zmq.NewSocket(zmq.PUB)
42+
if err != nil {
43+
return nil, fmt.Errorf("failed to create ZMQ PUB socket: %w", err)
44+
}
45+
46+
if err := socket.Connect(endpoint); err != nil {
47+
errClose := socket.Close()
48+
return nil, errors.Join(
49+
fmt.Errorf("failed to connect to %s: %w", endpoint, err),
50+
errClose,
51+
)
52+
}
53+
54+
return &Publisher{
55+
socket: socket,
56+
endpoint: endpoint,
57+
}, nil
58+
}
59+
60+
// PublishEvent publishes a KV cache event batch to the ZMQ topic.
61+
// topic should include the pod identifier (e.g., "kv.pod1").
62+
func (p *Publisher) PublishEvent(ctx context.Context, topic string, batch interface{}) error {
63+
logger := klog.FromContext(ctx).V(0)
64+
65+
payload, err := msgpack.Marshal(batch)
66+
if err != nil {
67+
return fmt.Errorf("failed to marshal event batch: %w", err)
68+
}
69+
70+
// sequence number for ordering
71+
seq := atomic.AddUint64(&p.seqNum, 1)
72+
seqBytes := make([]byte, 8)
73+
binary.BigEndian.PutUint64(seqBytes, seq)
74+
75+
// send topic, sequence, payload
76+
if _, err := p.socket.SendMessage(topic, seqBytes, payload); err != nil {
77+
return fmt.Errorf("failed to send message to topic %s: %w", topic, err)
78+
}
79+
80+
logger.Info("Published event batch", "topic", topic, "seq", seq)
81+
return nil
82+
}
83+
84+
// Close closes the publisher and cleans up resources.
85+
func (p *Publisher) Close() error {
86+
if p.socket != nil {
87+
return p.socket.Close()
88+
}
89+
return nil
90+
}

pkg/common/publisher_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2025 The llm-d-inference-sim Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package common
18+
19+
import (
20+
"context"
21+
"encoding/binary"
22+
23+
"time"
24+
25+
. "github.com/onsi/ginkgo/v2"
26+
. "github.com/onsi/gomega"
27+
zmq "github.com/pebbe/zmq4"
28+
"github.com/vmihailenco/msgpack"
29+
)
30+
31+
const (
32+
topic = "test-topic"
33+
endpoint = "tcp://localhost:5557"
34+
data = "Hello"
35+
)
36+
37+
var _ = Describe("Publisher", func() {
38+
It("should publish and receive correct message", func() {
39+
zctx, err := zmq.NewContext()
40+
Expect(err).NotTo(HaveOccurred())
41+
sub, err := zctx.NewSocket(zmq.SUB)
42+
Expect(err).NotTo(HaveOccurred())
43+
err = sub.Bind(endpoint)
44+
Expect(err).NotTo(HaveOccurred())
45+
err = sub.SetSubscribe(topic)
46+
Expect(err).NotTo(HaveOccurred())
47+
48+
time.Sleep(100 * time.Millisecond)
49+
50+
pub, err := NewPublisher(endpoint)
51+
Expect(err).NotTo(HaveOccurred())
52+
53+
ctx, cancel := context.WithCancel(context.Background())
54+
defer cancel()
55+
56+
go func() {
57+
// Make sure that sub.RecvMessageBytes is called before pub.PublishEvent
58+
time.Sleep(time.Second)
59+
err := pub.PublishEvent(ctx, topic, data)
60+
Expect(err).NotTo(HaveOccurred())
61+
}()
62+
63+
// The message should be [topic, seq, payload]
64+
parts, err := sub.RecvMessageBytes(0)
65+
Expect(err).NotTo(HaveOccurred())
66+
Expect(parts).To(HaveLen(3))
67+
68+
Expect(string(parts[0])).To(Equal(topic))
69+
70+
seq := binary.BigEndian.Uint64(parts[1])
71+
Expect(seq).To(Equal(uint64(1)))
72+
73+
var payload string
74+
err = msgpack.Unmarshal(parts[2], &payload)
75+
Expect(err).NotTo(HaveOccurred())
76+
Expect(payload).To(Equal(data))
77+
})
78+
})

0 commit comments

Comments
 (0)