Skip to content

Commit 126c239

Browse files
committed
Abstract packetSender
1 parent 92e9703 commit 126c239

File tree

3 files changed

+65
-91
lines changed

3 files changed

+65
-91
lines changed

protocol/packet.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,54 @@ PACKET_RX_LOOP:
119119
}
120120
}
121121

122+
// Publish data over Serial interface.
123+
// We need to get an Ack before sending the next publish packet.
124+
// Resend same publish packet after timeout, and kill link after 5 retries.
125+
func (t *protocolTransport) packetSender(getData func() (Packet, error), onError func()) {
126+
defer t.session.Done()
127+
sequenceTxFlag := false
128+
retries := 0
129+
for {
130+
p, err := getData()
131+
if err != nil {
132+
if t.state == Connected {
133+
log.Printf("Error receiving data: %v. Disconnecting from Protocol partner\n", err)
134+
t.txBuff <- Packet{command: disconnect}
135+
if onError != nil {
136+
onError()
137+
}
138+
}
139+
return
140+
}
141+
if sequenceTxFlag {
142+
p.command |= 0x80
143+
}
144+
PUB_LOOP:
145+
for {
146+
t.txBuff <- p
147+
select {
148+
case ack, ok := <-t.acknowledgeEvent:
149+
if ok && ack == sequenceTxFlag {
150+
retries = 0
151+
sequenceTxFlag = !sequenceTxFlag
152+
break PUB_LOOP // success
153+
}
154+
case <-time.After(time.Millisecond * 500):
155+
retries++
156+
if retries >= 5 {
157+
log.Println("Too many tx serial retries. Disconnecting from Protocol partner")
158+
t.txBuff <- Packet{command: disconnect}
159+
if onError != nil {
160+
onError()
161+
}
162+
return
163+
}
164+
}
165+
}
166+
}
167+
}
168+
169+
// Generate tcp connection string used to dial tcp server from Protocol Client's connect packet payload.
122170
func makeTCPConnString(connPayload []byte) string {
123171
port := binary.LittleEndian.Uint16(connPayload[4:])
124172
connString := ""

protocol/protocolClient.go

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package protocol
22

33
import (
44
"bytes"
5+
"errors"
56
"log"
67
"time"
78
)
@@ -136,7 +137,15 @@ func (c *client) handleRxPacket(packet *Packet) {
136137
c.state = Connected
137138
c.acknowledgeEvent <- true
138139
c.session.Add(1)
139-
go c.packetSender()
140+
go c.packetSender(func() (p Packet, err error) {
141+
p, ok := <-c.userData_tx
142+
if ok {
143+
err = nil
144+
} else {
145+
err = errors.New("channel closed")
146+
}
147+
return
148+
}, c.Stop)
140149
// log.Println("Client: Connected")
141150
case disconnect:
142151
if c.state == Connected {
@@ -145,46 +154,3 @@ func (c *client) handleRxPacket(packet *Packet) {
145154
}
146155
}
147156
}
148-
149-
// Publish data from user app over Serial interface.
150-
// We need to get an Ack before sending the next publish packet.
151-
// Resend same publish packet after timeout, and kill link after 5 retries.
152-
func (c *client) packetSender() {
153-
defer c.session.Done()
154-
sequenceTxFlag := false
155-
retries := 0
156-
for {
157-
p, ok := <-c.userData_tx
158-
if !ok {
159-
if c.state == Connected {
160-
// log.Printf("Error receiving upstream: %v. Disconnecting client\n", err)
161-
c.txBuff <- Packet{command: disconnect}
162-
// c.dropLink()
163-
}
164-
return
165-
}
166-
if sequenceTxFlag {
167-
p.command |= 0x80
168-
}
169-
PUB_LOOP:
170-
for {
171-
c.txBuff <- p
172-
select {
173-
case ack, ok := <-c.acknowledgeEvent:
174-
if ok && ack == sequenceTxFlag {
175-
retries = 0
176-
sequenceTxFlag = !sequenceTxFlag
177-
break PUB_LOOP // success
178-
}
179-
case <-time.After(time.Millisecond * 500):
180-
retries++
181-
if retries >= 5 {
182-
log.Println("Too many downstream send retries. Disconnecting client")
183-
c.txBuff <- Packet{command: disconnect}
184-
c.Stop()
185-
return
186-
}
187-
}
188-
}
189-
}
190-
}

protocol/protocolGateway.go

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package protocol
33
import (
44
"log"
55
"net"
6-
"time"
76
)
87

98
// Implementation of the Protocol Gateway.
@@ -76,7 +75,13 @@ func (g *gateway) handleRxPacket(packet *Packet) {
7675

7776
// Start link session
7877
g.session.Add(1)
79-
go g.packetSender()
78+
tx := make([]byte, 512)
79+
go g.packetSender(func() (p Packet, err error) {
80+
// Publish data downstream received from upstream tcp server.
81+
n, err := g.uStream.Read(tx)
82+
p = Packet{command: publish, payload: tx[:n]}
83+
return
84+
}, g.dropLink)
8085
// log.Printf("Gateway: Connected to %v\n", dstStr)
8186
g.state = Connected
8287
g.txBuff <- Packet{command: connack}
@@ -88,51 +93,6 @@ func (g *gateway) handleRxPacket(packet *Packet) {
8893
}
8994
}
9095

91-
// Publish data downstream received from upstream tcp server.
92-
// We need to get an Ack before sending the next publish packet.
93-
// Resend same publish packet after timeout, and kill link after 5 retries.
94-
func (g *gateway) packetSender() {
95-
defer g.session.Done()
96-
sequenceTxFlag := false
97-
retries := 0
98-
tx := make([]byte, 512)
99-
for {
100-
nRx, err := g.uStream.Read(tx)
101-
if err != nil {
102-
if g.state == Connected {
103-
log.Printf("Error receiving upstream: %v. Disconnecting client\n", err)
104-
g.txBuff <- Packet{command: disconnect}
105-
g.dropLink()
106-
}
107-
return
108-
}
109-
p := Packet{command: publish, payload: tx[:nRx]}
110-
if sequenceTxFlag {
111-
p.command |= 0x80
112-
}
113-
PUB_LOOP:
114-
for {
115-
g.txBuff <- p
116-
select {
117-
case ack, ok := <-g.acknowledgeEvent:
118-
if ok && ack == sequenceTxFlag {
119-
retries = 0
120-
sequenceTxFlag = !sequenceTxFlag
121-
break PUB_LOOP // success
122-
}
123-
case <-time.After(time.Millisecond * 500):
124-
retries++
125-
if retries >= 5 {
126-
log.Println("Too many downstream send retries. Disconnecting client")
127-
g.txBuff <- Packet{command: disconnect}
128-
g.dropLink()
129-
return
130-
}
131-
}
132-
}
133-
}
134-
}
135-
13696
// End link session between upstream server and downstream client.
13797
func (g *gateway) dropLink() {
13898
if g.uStream != nil {

0 commit comments

Comments
 (0)