Skip to content
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ let package = Package(
dependencies: [
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
]
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,24 @@ extension ByteBuffer {
self.writeString(string)
}
}

extension ByteBuffer {
/// Checks if the next two bytes in the buffer are a carriage return and newline.
/// Does not consume any bytes.
func isNextEndOfLine() -> Bool {
return self.readableBytes >= 2 &&
self.getInteger(at: self.readerIndex, as: UInt8.self) == UInt8.carriageReturn &&
self.getInteger(at: self.readerIndex + 1, as: UInt8.self) == UInt8.newline
}

/// Consumes the next two bytes in the buffer if they are a carriage return and newline.
/// Returns `true` if the end of line was successfully consumed, `false` otherwise.
mutating func consumeEndOfLine() -> Bool {
guard self.isNextEndOfLine() else {
return false
}

self.moveReaderIndex(forwardBy: 2)
return true
}
}
41 changes: 41 additions & 0 deletions Sources/SwiftMemcache/MemcachedResponse.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-memcache-gsoc open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-memcache-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-memcache-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

struct MemcachedResponse {
enum ReturnCode {
case stored
case notStored
case exists
case notFound

init(_ bytes: UInt16) {
switch bytes {
case 0x4844: // "HD"
self = .stored
case 0x4E53: // "NS"
self = .notStored
case 0x4558: // "EX"
self = .exists
case 0x4E46: // "NF"
self = .notFound
default:
preconditionFailure("Unrecognized response code.")
}
}
}

var returnCode: ReturnCode
var dataLength: UInt64?
var flags: [UInt8]
}
179 changes: 179 additions & 0 deletions Sources/SwiftMemcache/MemcachedResponseDecoder.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-memcache-gsoc open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-memcache-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-memcache-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import NIOPosix

/// Responses look like:
///
/// <RC> <datalen*> <flag1> <flag2> <...>\r\n
///
/// Where <RC> is a 2 character return code. The number of flags returned are
/// based off of the flags supplied.
///
/// <datalen> is only for responses with payloads, with the return code 'VA'.
///
/// Flags are single character codes, ie 'q' or 'k' or 'I', which adjust the
/// behavior of the command. If a flag requests a response flag (ie 't' for TTL
/// remaining), it is returned in the same order as they were in the original
/// command, though this is not strict.
///
/// Flags are single character codes, ie 'q' or 'k' or 'O', which adjust the
/// behavior of a command. Flags may contain token arguments, which come after the
/// flag and before the next space or newline, ie 'Oopaque' or 'Kuserkey'. Flags
/// can return new data or reflect information, in the same order they were
/// supplied in the request. Sending an 't' flag with a get for an item with 20
/// seconds of TTL remaining, would return 't20' in the response.
///
/// All commands accept a tokens 'P' and 'L' which are completely ignored. The
/// arguments to 'P' and 'L' can be used as hints or path specifications to a
/// proxy or router inbetween a client and a memcached daemon. For example, a
/// client may prepend a "path" in the key itself: "mg /path/foo v" or in a proxy
/// token: "mg foo Lpath/ v" - the proxy may then optionally remove or forward the
/// token to a memcached daemon, which will ignore them.
///
/// Syntax errors are handled the same as noted under 'Error strings' section
/// below.
///
/// For usage examples beyond basic syntax, please see the wiki:
/// https://github.com/memcached/memcached/wiki/MetaCommands
struct MemcachedResponseDecoder: NIOSingleStepByteToMessageDecoder {
typealias InboundOut = MemcachedResponse

/// Describes the errors that can occur during the decoding process.
enum MemcachedDecoderError: Error {
/// This error is thrown when EOF is encountered but there are still
/// readable bytes in the buffer, which can indicate a bad message.
case unexpectedEOF

/// This error is thrown when EOF is encountered but the decoder's next step
/// is not `.none`. This error suggests that the message ended prematurely,
/// possibly indicating a bad message.
case unexpectedNextStep(NextStep)
}

/// The next step that the decoder will take. The value of this enum determines how the decoder
/// processes the current state of the ByteBuffer.
enum NextStep: Hashable {
/// No further steps are needed, the decoding process is complete for the current message.
case none
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we never transition to that state and since our state machine is looping we can get rid of this

/// The initial step.
case returnCode
/// Decode the data length, flags or check if we are the end
case dataLengthOrFlag(MemcachedResponse.ReturnCode)
/// Decode the next flag
case decodeNextFlag(MemcachedResponse.ReturnCode, UInt64?, [UInt8])
/// Decode end of line
case decodeEndOfLine(MemcachedResponse.ReturnCode, UInt64?, [UInt8])
}

/// The action that the decoder will take in response to the current state of the ByteBuffer and the `NextStep`.
enum NextDecodeAction {
/// We need more bytes to decode the next step.
case waitForMoreBytes
/// We can continue decoding.
case continueDecodeLoop
/// We have decoded the next response and need to return it.
case returnDecodedResponse(MemcachedResponse)
}

/// The next step in decoding.
var nextStep: NextStep = .returnCode

mutating func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
while self.nextStep != .none {
switch try self.next(buffer: &buffer) {
case .returnDecodedResponse(let response):
return response

case .waitForMoreBytes:
return nil

case .continueDecodeLoop:
()
}
}
return nil
}

mutating func next(buffer: inout ByteBuffer) throws -> NextDecodeAction {
switch self.nextStep {
case .none:
return .waitForMoreBytes

case .returnCode:
guard let bytes = buffer.readInteger(as: UInt16.self) else {
return .waitForMoreBytes
}

let returnCode = MemcachedResponse.ReturnCode(bytes)
self.nextStep = .dataLengthOrFlag(returnCode)
return .continueDecodeLoop

case .dataLengthOrFlag(let returnCode):
if returnCode == .stored {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct we should only decode the data length if the returnCode is VA

// TODO: Implement decoding of data length
}

// Check if the next bytes are \r\n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking if the next two bytes are something we should rather read the next byte and just check if it is a whitespace. If is a whitespace we have to decode flags otherwise we check if it is a \ and continue to read the next 3 bytes. It is fine if we return waitForMoreBytes when we got a \ and don't have 3 more bytes in the buffer. This should be rarely hit and we don't have to introduce a new state.

if buffer.consumeEndOfLine() {
let response = MemcachedResponse(returnCode: returnCode, dataLength: nil, flags: [])
self.nextStep = .returnCode
return .returnDecodedResponse(response)
} else {
self.nextStep = .decodeNextFlag(returnCode, nil, [])
return .continueDecodeLoop
}

case .decodeNextFlag(let returnCode, let dataLength, var flags):
if let nextByte = buffer.readInteger(as: UInt8.self), nextByte != UInt8.whitespace {
flags.append(nextByte)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to allocate a new array each time, I could ignore flags for the time being

self.nextStep = .decodeNextFlag(returnCode, dataLength, flags)
return .continueDecodeLoop
} else {
self.nextStep = .decodeEndOfLine(returnCode, dataLength, flags)
return .continueDecodeLoop
}

case .decodeEndOfLine(let returnCode, let dataLength, let flags):
guard buffer.consumeEndOfLine() else {
return .waitForMoreBytes
}

let response = MemcachedResponse(returnCode: returnCode, dataLength: dataLength, flags: flags)
self.nextStep = .returnCode
return .returnDecodedResponse(response)
}
}

mutating func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> MemcachedResponse? {
// Try to decode what is left in the buffer.
if let output = try self.decode(buffer: &buffer) {
return output
}

guard buffer.readableBytes == 0 || seenEOF else {
// If there are still readable bytes left and we haven't seen an EOF
// then something is wrong with the message or how we called the decoder.
throw MemcachedDecoderError.unexpectedEOF
}

switch self.nextStep {
case .none, .returnCode:
return nil
default:
throw MemcachedDecoderError.unexpectedNextStep(self.nextStep)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class MemcachedIntegrationTest: XCTestCase {
self.channel = ClientBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelInitializer { channel in
channel.pipeline.addHandler(MessageToByteHandler(MemcachedRequestEncoder()))
return channel.pipeline.addHandlers([MessageToByteHandler(MemcachedRequestEncoder()), ByteToMessageHandler(MemcachedResponseDecoder())])
}
}

Expand All @@ -36,6 +36,21 @@ final class MemcachedIntegrationTest: XCTestCase {
super.tearDown()
}

class ResponseHandler: ChannelInboundHandler {
typealias InboundIn = MemcachedResponse

let p: EventLoopPromise<MemcachedResponse>

init(p: EventLoopPromise<MemcachedResponse>) {
self.p = p
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let response = self.unwrapInboundIn(data)
self.p.succeed(response)
}
}

func testConnectionToMemcachedServer() throws {
do {
let connection = try channel.connect(host: "memcached", port: 11211).wait()
Expand All @@ -47,15 +62,24 @@ final class MemcachedIntegrationTest: XCTestCase {
let command = MemcachedRequest.SetCommand(key: "foo", value: buffer)
let request = MemcachedRequest.set(command)

// Write the request to the connection and wait for the result
connection.writeAndFlush(request).whenComplete { result in
switch result {
case .success:
print("Request successfully sent to the server.")
case .failure(let error):
XCTFail("Failed to send request: \(error)")
}
}
// Write the request to the connection
_ = connection.write(request)

// Prepare the promise for the response
let promise = connection.eventLoop.makePromise(of: MemcachedResponse.self)
let responseHandler = ResponseHandler(p: promise)
_ = connection.pipeline.addHandler(responseHandler)

// Flush and then read the response from the server
connection.flush()
connection.read()

// Wait for the promise to be fulfilled
let response = try promise.futureResult.wait()

// Check the response from the server.
print("Response return code: \(response.returnCode)")

} catch {
XCTFail("Failed to connect to Memcached server: \(error)")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-memcache-gsoc open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-memcache-gsoc project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-memcache-gsoc project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import NIOEmbedded
@testable import SwiftMemcache
import XCTest

final class MemcachedResponseDecoderTests: XCTestCase {
var decoder: MemcachedResponseDecoder!
var channel: EmbeddedChannel!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a Channel here we can just call the methods on the decoder without the channel.


override func setUp() {
super.setUp()
self.decoder = MemcachedResponseDecoder()
self.channel = EmbeddedChannel(handler: ByteToMessageHandler(self.decoder))
}

override func tearDown() {
XCTAssertNoThrow(try self.channel.finish())
}

func testDecodeSetResponse(returnCode: [UInt8], expectedReturnCode: MemcachedResponse.ReturnCode) throws {
// Prepare a response buffer with a response code
var buffer = ByteBufferAllocator().buffer(capacity: 8)
buffer.writeBytes(returnCode)
buffer.writeBytes([UInt8.carriageReturn, UInt8.newline])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a helper method called makeMemcachedResponseByteBuffer that takes a MemcachedResponse and encodes it for us. We will have a bunch more tests down the line and this will help us write them easily


// Pass our response through the decoder
XCTAssertNoThrow(try self.channel.writeInbound(buffer))

// Read the decoded response
if let decoded = try self.channel.readInbound(as: MemcachedResponse.self) {
XCTAssertEqual(decoded.returnCode, expectedReturnCode)
} else {
XCTFail("Failed to decode the inbound response.")
}
}

func testDecodeSetStoredResponse() throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the Set from the method names since the responses don't indicate the request methods.

let storedReturnCode = [UInt8(ascii: "H"), UInt8(ascii: "D")]
try testDecodeSetResponse(returnCode: storedReturnCode, expectedReturnCode: .stored)
}

func testDecodeSetNotStoredResponse() throws {
let notStoredReturnCode = [UInt8(ascii: "N"), UInt8(ascii: "S")]
try testDecodeSetResponse(returnCode: notStoredReturnCode, expectedReturnCode: .notStored)
}

func testDecodeSetExistResponse() throws {
let existReturnCode = [UInt8(ascii: "E"), UInt8(ascii: "X")]
try testDecodeSetResponse(returnCode: existReturnCode, expectedReturnCode: .exists)
}

func testDecodeSetNotFoundResponse() throws {
let notFoundResponseCode = [UInt8(ascii: "N"), UInt8(ascii: "F")]
try testDecodeSetResponse(returnCode: notFoundResponseCode, expectedReturnCode: .notFound)
}
}