Skip to content
Prev Previous commit
Next Next commit
Response and decoder refactor
  • Loading branch information
dkz2 committed May 30, 2023
commit 6ebbea21aa2ff2bcdd42fd5807efffac38551091
38 changes: 0 additions & 38 deletions Sources/SwiftMemcache/Extensions/UInt16+ResponseStatus.swift

This file was deleted.

26 changes: 21 additions & 5 deletions Sources/SwiftMemcache/MemcachedResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,28 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
struct MemcachedResponse {
enum ReturnCode: UInt16 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to back this with the raw value right now

Suggested change
enum ReturnCode: UInt16 {
enum ReturnCode {
case stored
case notStored
case exists
case notFound

enum MemcachedResponse {
struct SetResponse {
let status: ResponseStatus
init(_ value: UInt16) {
switch value {
case 0x4844: // "HD"
self = .stored
case 0x4E53: // "NS"
self = .notStored
case 0x4558: // "EX"
self = .exists
case 0x4E46: // "NF"
self = .notFound
default:
preconditionFailure("Unrecognized response code.")
}
}
}

case set(SetResponse)
let returnCode: ReturnCode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a var

Suggested change
let returnCode: ReturnCode
var returnCode: ReturnCode
}
84 changes: 63 additions & 21 deletions Sources/SwiftMemcache/MemcachedResponseDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,84 @@
import NIOCore
import NIOPosix

struct MemcachedResponseDecoder: ByteToMessageDecoder {
struct MemcachedResponseDecoder: NIOSingleStepByteToMessageDecoder {
/// Responses look like:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to the doc of the type. So two lines above

///
/// <RC> <datalen*> <flag1> <flag2> <...>\r\n
///
/// Where <RC> is a 2 character return code. The number of flags returned are
/// based off of the flags supplied.
///
/// <datalen> is only for responses with payloads, with the return code 'VA'.
///
/// Flags are single character codes, ie 'q' or 'k' or 'I', which adjust the
/// behavior of the command. If a flag requests a response flag (ie 't' for TTL
/// remaining), it is returned in the same order as they were in the original
/// command, though this is not strict.
///
/// Flags are single character codes, ie 'q' or 'k' or 'O', which adjust the
/// behavior of a command. Flags may contain token arguments, which come after the
/// flag and before the next space or newline, ie 'Oopaque' or 'Kuserkey'. Flags
/// can return new data or reflect information, in the same order they were
/// supplied in the request. Sending an 't' flag with a get for an item with 20
/// seconds of TTL remaining, would return 't20' in the response.
///
/// All commands accept a tokens 'P' and 'L' which are completely ignored. The
/// arguments to 'P' and 'L' can be used as hints or path specifications to a
/// proxy or router inbetween a client and a memcached daemon. For example, a
/// client may prepend a "path" in the key itself: "mg /path/foo v" or in a proxy
/// token: "mg foo Lpath/ v" - the proxy may then optionally remove or forward the
/// token to a memcached daemon, which will ignore them.
///
/// Syntax errors are handled the same as noted under 'Error strings' section
/// below.
///
/// For usage examples beyond basic syntax, please see the wiki:
/// https://github.com/memcached/memcached/wiki/MetaCommands
typealias InboundOut = MemcachedResponse

var cumulationBuffer: ByteBuffer?

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost right however you have to account for the fact that there are multiple responses inside the buffer. Furthermore, we are doing a bunch of duplicated work here if we get partial responses e.g. if we get everything except the \r\n we are going to re-parse the return code and flags every time. To avoid this we should introduce a little state machine to this decoder. Some rough code how this might look

 private enum NextStep: Hashable { /// The initial step. case returnCode /// Decode the data length, flags or check if we are the end case dataLengthOrFlag(MemcachedResponse.ReturnCode) /// Decode the next flag case decodeNextFlag(MemcachedResponse.ReturnCode , UInt64, [Flag]) /// Decode end of line case decodeEndOfLine(MemcachedResponse.ReturnCode, UInt64?, [Flag]) etc... } private enum NextDecodeAction { /// We need more bytes to decode the next step. case waitForMoreBytes /// We can continue decoding. case continueDecodeLoop /// We have decoded the next response and need to return it. case returnDecodedResponse(MemcachedResponse) } /// The next step in decoding. private var nextStep: NextStep = .decodeReturnCode public mutating func decode(buffer: inout ByteBuffer) throws -> MemcachedResponse? { while self.nextStep != .none { switch try self.next(buffer: &buffer) { case .returnDecodedResponse(let response): return response case .waitForMoreBytes: return nil case .continueDecodeLoop: () } } return nil } private mutating func next(buffer: inout ByteBuffer) throws -> NextDecodeAction { switch self.nextStep { case .decodeReturnCode: // Try to decode the return code and move the state case .decodeDataLengthOrFlag(let returnCode): // Check if the return code is `VA` then decode the data length otherwise move state to flag decoding } } public mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> MemcachedResponse? { // Try to decode what is left in the buffer. if let output = try self.decode(buffer: &buffer) { return output } guard buffer.readableBytes == 0 || seenEOF else { // If there are still readable bytes left and we haven't seen an EOF // then something is wrong with the message or how we called the decoder. throw SomEError } switch self.nextStep { // We have to decide what to do here. Some steps are fine to be in. } } 
// Ensure the buffer has at least 3 bytes (minimum for a response code and newline)
guard buffer.readableBytes >= 3 else {
return .needMoreData
return nil // Need more data
}

guard let firstResponseDigit = buffer.readInteger(as: UInt8.self),
let secondResponseDigit = buffer.readInteger(as: UInt8.self),
let responseStatus = ResponseStatus(asciiValues: (firstResponseDigit, secondResponseDigit)) else {
// Read the first two characters
guard let firstReturnCode = buffer.readInteger(as: UInt8.self),
let secondReturnCode = buffer.readInteger(as: UInt8.self) else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just read a single UInt16 here?

preconditionFailure("Response code could not be read.")
}

// Check if there's a whitespace character, this indicates flags are present
if buffer.readableBytes > 2, buffer.getInteger(at: buffer.readerIndex, as: UInt8.self) == UInt8.whitespace {
buffer.moveReaderIndex(forwardBy: 1)
let returnCode = MemcachedResponse.ReturnCode(
UInt16(firstReturnCode) << 8 | UInt16(secondReturnCode)
)

// -2 for \r\n
_ = buffer.readSlice(length: buffer.readableBytes - 2)
// If there is not a whitespace, then we are at the end of the line.
guard buffer.readableBytes > 0, let nextByte = buffer.getInteger(at: buffer.readerIndex, as: UInt8.self) else {
return nil // Need more dat
}

guard buffer.readInteger(as: UInt8.self) == UInt8.carriageReturn,
buffer.readInteger(as: UInt8.self) == UInt8.newline else {
preconditionFailure("Line ending '\r\n' not found after the flags.")
if nextByte != UInt8.whitespace {
// We're at the end of the line
buffer.moveReaderIndex(forwardBy: 1)
} else {
// We have additional data or flags to read
buffer.moveReaderIndex(forwardBy: 1)

// Assert that we really read \r\n
guard buffer.readableBytes >= 2,
buffer.getInteger(at: buffer.readerIndex, as: UInt8.self) == UInt8.carriageReturn,
buffer.getInteger(at: buffer.readerIndex + 1, as: UInt8.self) == UInt8.newline else {
preconditionFailure("Response ending '\r\n' not found.")
}

buffer.moveReaderIndex(forwardBy: 2)
}

let setResponse = MemcachedResponse.SetResponse(status: responseStatus)
context.fireChannelRead(self.wrapInboundOut(.set(setResponse)))
return .continue
return MemcachedResponse(returnCode: returnCode)
}

func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
return try self.decode(context: context, buffer: &buffer)
func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
return try self.decode(buffer: &buffer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ final class MemcachedIntegrationTest: XCTestCase {
let response = try promise.futureResult.wait()

// Check the response from the server.
switch response {
case .set(let setResponse):
print("Response status: \(setResponse.status)")
}
print("Response return code: \(response.returnCode)")

} catch {
XCTFail("Failed to connect to Memcached server: \(error)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,40 @@ final class MemcachedResponseDecoderTests: XCTestCase {
XCTAssertNoThrow(try self.channel.finish())
}

func testDecodeSetResponse(responseCode: [UInt8], expectedResponseCode: ResponseStatus, expectedFlags: ByteBuffer? = nil) throws {
func testDecodeSetResponse(returnCode: [UInt8], expectedReturnCode: MemcachedResponse.ReturnCode) throws {
// Prepare a response buffer with a response code
var buffer = ByteBufferAllocator().buffer(capacity: 8)
buffer.writeBytes(responseCode)

buffer.writeBytes(returnCode)
buffer.writeBytes([UInt8.carriageReturn, UInt8.newline])

// Pass our response through the decoder
XCTAssertNoThrow(try self.channel.writeInbound(buffer))

// Read the decoded response
if let decoded = try self.channel.readInbound(as: MemcachedResponse.self) {
switch decoded {
case .set(let setResponse):
XCTAssertEqual(setResponse.status, expectedResponseCode)
}
XCTAssertEqual(decoded.returnCode, expectedReturnCode)
} else {
XCTFail("Failed to decode the inbound response.")
}
}

func testDecodeSetStoredResponse() throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the Set from the method names since the responses don't indicate the request methods.

let storedResponseCode = [UInt8(ascii: "H"), UInt8(ascii: "D")]
try testDecodeSetResponse(responseCode: storedResponseCode, expectedResponseCode: .stored)
let storedReturnCode = [UInt8(ascii: "H"), UInt8(ascii: "D")]
try testDecodeSetResponse(returnCode: storedReturnCode, expectedReturnCode: .stored)
}

func testDecodeSetNotStoredResponse() throws {
let notStoredResponseCode = [UInt8(ascii: "N"), UInt8(ascii: "S")]
try testDecodeSetResponse(responseCode: notStoredResponseCode, expectedResponseCode: .notStored)
let notStoredReturnCode = [UInt8(ascii: "N"), UInt8(ascii: "S")]
try testDecodeSetResponse(returnCode: notStoredReturnCode, expectedReturnCode: .notStored)
}

func testDecodeSetExistResponse() throws {
let existResponseCode = [UInt8(ascii: "E"), UInt8(ascii: "X")]
try testDecodeSetResponse(responseCode: existResponseCode, expectedResponseCode: .exists)
let existReturnCode = [UInt8(ascii: "E"), UInt8(ascii: "X")]
try testDecodeSetResponse(returnCode: existReturnCode, expectedReturnCode: .exists)
}

func testDecodeSetNotFoundResponse() throws {
let notFoundResponseCode = [UInt8(ascii: "N"), UInt8(ascii: "F")]
try testDecodeSetResponse(responseCode: notFoundResponseCode, expectedResponseCode: .notFound)
try testDecodeSetResponse(returnCode: notFoundResponseCode, expectedReturnCode: .notFound)
}
}