Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ci/clang-check-format.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"

cd $DIR/../

# DIR dos2unix to clean-up line endings
echo "Applying dos2unix"
find . -iname *.hpp -o -iname *.cpp -o -iname *.tpp -o -iname *.h | grep -vi config.h | grep -vi thirdparty | xargs dos2unix

# Apply clang-format
echo "Applying clang-format"
find . -iname *.hpp -o -iname *.cpp -o -iname *.tpp -o -iname *.h | grep -vi config.h | grep -vi thirdparty | xargs clang-format -i --verbose style=Google
69 changes: 35 additions & 34 deletions include/rtps/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,57 +25,58 @@ Author: i11 - Embedded Software, RWTH Aachen University
#ifndef RTPS_THREADPOOL_H
#define RTPS_THREADPOOL_H

#include "rtps/config.h"
#include "lwip/sys.h"
#include "rtps/communication/UdpDriver.h"
#include "rtps/communication/PacketInfo.h"
#include "rtps/communication/UdpDriver.h"
#include "rtps/config.h"
#include "rtps/storages/PBufWrapper.h"
#include "rtps/storages/ThreadSafeCircularBuffer.h"

#include <array>

namespace rtps {

class Writer;


class ThreadPool {
public:
using receiveJumppad_fp = void(*)(void* callee, const PacketInfo& packet);

ThreadPool(receiveJumppad_fp receiveCallback, void* callee);
class Writer;

~ThreadPool();
class ThreadPool {
public:
using receiveJumppad_fp = void (*)(void *callee, const PacketInfo &packet);

bool startThreads();
void stopThreads();
ThreadPool(receiveJumppad_fp receiveCallback, void *callee);

void clearQueues();
bool addWorkload(Writer* workload);
bool addNewPacket(PacketInfo&& packet);
~ThreadPool();

static void readCallback(void* arg, udp_pcb* pcb, pbuf* p, const ip_addr_t* addr, Ip4Port_t port);
bool startThreads();
void stopThreads();

void clearQueues();
bool addWorkload(Writer *workload);
bool addNewPacket(PacketInfo &&packet);

private:
receiveJumppad_fp m_receiveJumppad;
void* m_callee;
bool m_running = false;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_WRITERS> m_writers;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_READERS> m_readers;
static void readCallback(void *arg, udp_pcb *pcb, pbuf *p,
const ip_addr_t *addr, Ip4Port_t port);

sys_sem_t m_readerNotificationSem;
sys_sem_t m_writerNotificationSem;
private:
receiveJumppad_fp m_receiveJumppad;
void *m_callee;
bool m_running = false;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_WRITERS> m_writers;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_READERS> m_readers;

ThreadSafeCircularBuffer<Writer*, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH> m_queueOutgoing;
ThreadSafeCircularBuffer<PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH> m_queueIncoming;
sys_sem_t m_readerNotificationSem;
sys_sem_t m_writerNotificationSem;

static void writerThreadFunction(void* arg);
static void readerThreadFunction(void* arg);
void doWriterWork();
void doReaderWork();
ThreadSafeCircularBuffer<Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
m_queueOutgoing;
ThreadSafeCircularBuffer<PacketInfo,
Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
m_queueIncoming;

};
}
static void writerThreadFunction(void *arg);
static void readerThreadFunction(void *arg);
void doWriterWork();
void doReaderWork();
};
} // namespace rtps

#endif //RTPS_THREADPOOL_H
#endif // RTPS_THREADPOOL_H
168 changes: 85 additions & 83 deletions include/rtps/common/Locator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,97 +25,99 @@ Author: i11 - Embedded Software, RWTH Aachen University
#ifndef RTPS_LOCATOR_T_H
#define RTPS_LOCATOR_T_H

#include "ucdr/microcdr.h"
#include "rtps/utils/udpUtils.h"
#include "rtps/communication/UdpDriver.h"
#include "rtps/utils/udpUtils.h"
#include "ucdr/microcdr.h"

#include <array>

namespace rtps{
enum class LocatorKind_t : int32_t{
LOCATOR_KIND_INVALID = -1,
LOCATOR_KIND_RESERVED = 0,
LOCATOR_KIND_UDPv4 = 1,
LOCATOR_KIND_UDPv6 = 2
};

const uint32_t LOCATOR_PORT_INVALID = 0;
const std::array<uint8_t, 16> LOCATOR_ADDRESS_INVALID = {0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};

struct Locator{
LocatorKind_t kind = LocatorKind_t::LOCATOR_KIND_INVALID;
uint32_t port = LOCATOR_PORT_INVALID;
std::array<uint8_t,16> address = LOCATOR_ADDRESS_INVALID; // TODO make private such that kind and address always match?

static Locator createUDPv4Locator(uint8_t a, uint8_t b, uint8_t c, uint8_t d, uint32_t port){
Locator locator;
locator.kind = LocatorKind_t::LOCATOR_KIND_UDPv4;
locator.address = {0,0,0,0,0,0,0,0,0,0,0,0,a,b,c,d};
locator.port = port;
return locator;
}

void setInvalid(){
kind = LocatorKind_t::LOCATOR_KIND_INVALID;
}

bool isValid() const{
return kind != LocatorKind_t::LOCATOR_KIND_INVALID;
}

bool readFromUcdrBuffer(ucdrBuffer& buffer){
if(ucdr_buffer_remaining(&buffer) < sizeof(Locator)){
return false;
}else{
ucdr_deserialize_array_uint8_t(&buffer, reinterpret_cast<uint8_t*>(this), sizeof(Locator));
return true;
}
}

bool serializeIntoUdcrBuffer(ucdrBuffer& buffer){
if(ucdr_buffer_remaining(&buffer) < sizeof(Locator)){
return false;
}else{
ucdr_serialize_array_uint8_t(&buffer, reinterpret_cast<uint8_t*>(this), sizeof(Locator));
}
}

ip4_addr_t getIp4Address() const{
return transformIP4ToU32(address[12], address[13], address[14], address[15]);
}

inline bool isSameSubnet() const{
return UdpDriver::isSameSubnet(getIp4Address());
}

} __attribute__((packed));

inline Locator getBuiltInUnicastLocator(ParticipantId_t participantId) {
return Locator::createUDPv4Locator(Config::IP_ADDRESS[0], Config::IP_ADDRESS[1],
Config::IP_ADDRESS[2], Config::IP_ADDRESS[3],
getBuiltInUnicastPort(participantId));
namespace rtps {
enum class LocatorKind_t : int32_t {
LOCATOR_KIND_INVALID = -1,
LOCATOR_KIND_RESERVED = 0,
LOCATOR_KIND_UDPv4 = 1,
LOCATOR_KIND_UDPv6 = 2
};

const uint32_t LOCATOR_PORT_INVALID = 0;
const std::array<uint8_t, 16> LOCATOR_ADDRESS_INVALID = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

struct Locator {
LocatorKind_t kind = LocatorKind_t::LOCATOR_KIND_INVALID;
uint32_t port = LOCATOR_PORT_INVALID;
std::array<uint8_t, 16> address =
LOCATOR_ADDRESS_INVALID; // TODO make private such that kind and address
// always match?

static Locator createUDPv4Locator(uint8_t a, uint8_t b, uint8_t c, uint8_t d,
uint32_t port) {
Locator locator;
locator.kind = LocatorKind_t::LOCATOR_KIND_UDPv4;
locator.address = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, a, b, c, d};
locator.port = port;
return locator;
}

void setInvalid() { kind = LocatorKind_t::LOCATOR_KIND_INVALID; }

bool isValid() const { return kind != LocatorKind_t::LOCATOR_KIND_INVALID; }

bool readFromUcdrBuffer(ucdrBuffer &buffer) {
if (ucdr_buffer_remaining(&buffer) < sizeof(Locator)) {
return false;
} else {
ucdr_deserialize_array_uint8_t(&buffer, reinterpret_cast<uint8_t *>(this),
sizeof(Locator));
return true;
}

inline Locator getBuiltInMulticastLocator() {
return Locator::createUDPv4Locator(239, 255, 0, 1, getBuiltInMulticastPort());
}

bool serializeIntoUdcrBuffer(ucdrBuffer &buffer) {
if (ucdr_buffer_remaining(&buffer) < sizeof(Locator)) {
return false;
} else {
ucdr_serialize_array_uint8_t(&buffer, reinterpret_cast<uint8_t *>(this),
sizeof(Locator));
}
}

inline Locator getUserUnicastLocator(ParticipantId_t participantId) {
return Locator::createUDPv4Locator(Config::IP_ADDRESS[0], Config::IP_ADDRESS[1],
Config::IP_ADDRESS[2], Config::IP_ADDRESS[3],
getUserUnicastPort(participantId));
}
ip4_addr_t getIp4Address() const {
return transformIP4ToU32(address[12], address[13], address[14],
address[15]);
}

inline Locator getUserMulticastLocator() {
return Locator::createUDPv4Locator(Config::IP_ADDRESS[0], Config::IP_ADDRESS[1],
Config::IP_ADDRESS[2], Config::IP_ADDRESS[3],
getUserMulticastPort());
}
inline bool isSameSubnet() const {
return UdpDriver::isSameSubnet(getIp4Address());
}

inline Locator getDefaultSendMulticastLocator() {
return Locator::createUDPv4Locator(239, 255, 0, 1,
getBuiltInMulticastPort());
}
} __attribute__((packed));

inline Locator getBuiltInUnicastLocator(ParticipantId_t participantId) {
return Locator::createUDPv4Locator(
Config::IP_ADDRESS[0], Config::IP_ADDRESS[1], Config::IP_ADDRESS[2],
Config::IP_ADDRESS[3], getBuiltInUnicastPort(participantId));
}

inline Locator getBuiltInMulticastLocator() {
return Locator::createUDPv4Locator(239, 255, 0, 1, getBuiltInMulticastPort());
}

inline Locator getUserUnicastLocator(ParticipantId_t participantId) {
return Locator::createUDPv4Locator(
Config::IP_ADDRESS[0], Config::IP_ADDRESS[1], Config::IP_ADDRESS[2],
Config::IP_ADDRESS[3], getUserUnicastPort(participantId));
}

inline Locator getUserMulticastLocator() {
return Locator::createUDPv4Locator(
Config::IP_ADDRESS[0], Config::IP_ADDRESS[1], Config::IP_ADDRESS[2],
Config::IP_ADDRESS[3], getUserMulticastPort());
}

inline Locator getDefaultSendMulticastLocator() {
return Locator::createUDPv4Locator(239, 255, 0, 1, getBuiltInMulticastPort());
}
} // namespace rtps

#endif //RTPS_LOCATOR_T_H
#endif // RTPS_LOCATOR_T_H
Loading