Skip to content

Commit e8b0381

Browse files
author
Ayush Jain
committed
Server side throughput mesaurement
1 parent 9555f3a commit e8b0381

File tree

7 files changed

+54
-26
lines changed

7 files changed

+54
-26
lines changed

apps/rpc_test/config_files/app.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ benchmarks:
44
client_poll_threads: 1
55
client_connections: 1
66
client_threads: 1
7-
rate: 1000000
7+
rate: 4000000
88
server_duration: 50 #server_duration in seconds
99
client_duration: 50
1010
client_batch_size: 32
1111
server_address: "172.19.0.121:8501"
1212
server_poll_threads: 1
13-
experiment: "mc_tput_16"
13+
experiment: "small_rpc_tput"
1414
core_affinity: [49, 64]

apps/rpc_test/src/benchmarks.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ rrr::Counter BenchmarkServiceImpl::at_counter;
44

55
void Benchmarks::create_server(){
66

7-
csi = new BenchmarkServiceImpl(conf->output_size_);
7+
8+
char dat_fil[256];
9+
sprintf(dat_fil, "data/sv_%s.csv", conf->exp_name.c_str());
10+
csi = new BenchmarkServiceImpl(conf->output_size_, std::string(dat_fil));
811

912

1013
pollmgr_ = new rrr::PollMgr(conf->server_poll_threads_);
@@ -35,7 +38,7 @@ void Benchmarks::create_server(){
3538

3639
}
3740
void Benchmarks::stop_server(){
38-
41+
csi->closeFile();
3942
pollmgr_->stop_threads();
4043
pollmgr_->release();
4144

@@ -99,7 +102,7 @@ void* Benchmarks::launch_client_thread(void *arg){
99102
rrr::Log::error( "Failed to open CSV file, %s", data_file_name);
100103
exit(EXIT_FAILURE);
101104
}
102-
csvFile << "Sent, Rate\n";
105+
csvFile << "Sent,Rate\n";
103106
csvFile.close();
104107
}
105108
std::ofstream csvFile(data_file_name); // Open file in append mode
@@ -109,19 +112,19 @@ void* Benchmarks::launch_client_thread(void *arg){
109112
}
110113
while(!ct->stop){
111114

112-
// while (((last_sent + cycle_wait) >= rte_get_timer_cycles())) {
113-
// ;
114-
// }
115+
while (((last_sent + cycle_wait) >= rte_get_timer_cycles())) {
116+
;
117+
}
115118

116119
(pr->add_bench_async());
117120
c++;
118121
if(c % (1000*1000) == 0){
119122
n++;
120123

121-
csvFile << c <<", " << c/t.elapsed() <<std::endl;
124+
csvFile << c <<"," << c/t.elapsed() <<std::endl;
122125

123126
}
124-
//last_sent = rte_get_timer_cycles();
127+
last_sent = rte_get_timer_cycles();
125128
//fg.wait_all();
126129

127130
#ifdef DPDK

apps/rpc_test/src/benchmarks.hpp

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,40 +56,57 @@ class BenchmarkServiceImpl : public CounterService {
5656
unsigned int time_;
5757
uint16_t out_size=1;
5858
rrr::PollMgr* pollmgr_;
59-
59+
rrr::Timer server_timer;
60+
std::string data_file_name;
6061

6162
std::string out_string;
63+
std::ofstream csvFile;
6264
public:
6365
uint64_t count_=0;
6466
static rrr::Counter at_counter;
65-
BenchmarkServiceImpl(uint16_t num_out): out_size(num_out){
67+
BenchmarkServiceImpl(uint16_t num_out, std::string data_file): out_size(num_out), data_file_name(data_file){
6668
for(int i=0;i<num_out;i++){
6769
out_string.push_back('a'+ rand()%26);
6870
}
71+
csvFile.open(data_file_name.c_str());
72+
csvFile << "Received,rate\n";
73+
at_counter.next();
74+
6975
}
7076

7177
void add() {
72-
count_++;
78+
//count_++;
7379
}
7480

7581
void add_long(const rrr::i32& a, const rrr::i32& b, const rrr::i32& c, const rrr::i64& d, const rrr::i64& e, const std::vector<rrr::i64>& input, rrr::i32* out, std::vector<rrr::i64>* output) {
76-
count_++;
82+
//count_++;
7783
output->insert(output->end(), {1, 2/*, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10*/});
7884
}
7985

8086
void add_short(const rrr::i64& a, rrr::i32* out) {
81-
count_++;
87+
//count_++;
8288
*out = a+1;
8389
}
8490
void add_bench(const std::string& in, std::string* out ) {
8591
// rrr::Log::info(__LINE__,__FILE__, "Out size = %d * 32",out_size);
8692
//count_++;
87-
// at_counter.next();
93+
94+
uint64_t val = at_counter.next();
8895
out->append(out_string.c_str());
96+
if(val == 1000*1000){
97+
server_timer.start();
98+
}
99+
else if((val%(1000*1000)) == 0){
100+
csvFile << val <<"," << val/server_timer.elapsed() <<std::endl;
101+
}
102+
103+
}
104+
void closeFile(){
105+
csvFile.close();
89106
}
90107
};
91108
class Benchmarks{
92-
BenchmarkServiceImpl *csi;
109+
93110
BenchmarkProxy** service_proxies;
94111
pthread_t** client_threads;
95112
std::thread stat_thread;
@@ -109,6 +126,7 @@ class Benchmarks{
109126
rrr::PollMgr* pollmgr_;
110127

111128
public:
129+
BenchmarkServiceImpl *csi;
112130
Benchmarks(AppConfig* config) : conf(config){
113131
rrr::Log::info("Core Affinity from %d - %d", conf->core_affinity_mask_[0], conf->core_affinity_mask_[1]);
114132
for(int i=conf->core_affinity_mask_[0];i <= conf->core_affinity_mask_[1];i++)

pylib/simplerpcgen/lang_cpp.pyc

0 Bytes
Binary file not shown.

pylib/simplerpcgen/rpcgen.pyc

0 Bytes
Binary file not shown.

src/rpc/client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ namespace rrr
162162
unsigned int available;
163163
for (int j = 0; j < nr_inrings; j++)
164164
{
165-
unsigned int nb_pkts = rte_ring_sc_dequeue_burst(conn->in_bufring[j], (void **)pkt_array, 16, &available);
165+
unsigned int nb_pkts = rte_ring_sc_dequeue_burst(conn->in_bufring[j], (void **)pkt_array, 32, &available);
166166
for (int i = 0; i < nb_pkts; i++)
167167
{
168168

src/rpc/dpdk_transport/transport_ev_loop.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ namespace rrr
124124
send_all++;
125125
}
126126
oconn->burst_size=32;
127-
accepted.insert({conn_id, oconn->conn_id});
127+
while(accepted.insert({conn_id, oconn->conn_id}).second ==false )
128+
;
128129
LOG_DEBUG("Accepted: %s, chosen thread %d", ConnToString(oconn->conn_id).c_str(),oconn->chosen_thread);
129130
conn_counter.next();
130131
conn_th_lock.unlock();
@@ -240,7 +241,8 @@ namespace rrr
240241
wait = 0;
241242
while (!oconn->connected_)
242243
{
243-
usleep(500 * 1000);
244+
usleep(
245+
500*1000);
244246
wait++;
245247
if (wait > 2)
246248
{
@@ -330,7 +332,7 @@ namespace rrr
330332
t_layer->process_sm_req(ctx);
331333
// transmit
332334
t_layer->do_transmit(ctx);
333-
335+
334336
t_layer->do_poll_job(ctx);
335337
}
336338
Log_info("Exiting EV thread %d, num pkts sent: %lu, num pkts received: %lu dropped_pakcets: %lu",
@@ -350,8 +352,8 @@ namespace rrr
350352
for (int i = 0; i < nb_sm_reqs_; i++)
351353
{
352354
ctx->out_connections[ctx->conn_arr[i]->conn_id] = ctx->conn_arr[i]; // Put the connection in local conn_table
353-
ctx->t_conns.insert(ctx->conn_arr[i]);
354-
// LOG_DEBUG("Added Connection %lu to thread %d, out_ring: %s", ctx->conn_arr[i]->conn_id, ctx->thread_id, ctx->conn_arr[i]->out_bufring[ctx->thread_id]->name);
355+
while(ctx->t_conns.insert(ctx->conn_arr[i]).second == false);
356+
LOG_DEBUG("Added Connection %lu to thread %d, out_ring: %s", ctx->conn_arr[i]->conn_id, ctx->thread_id, ctx->conn_arr[i]->out_bufring[ctx->thread_id]->name);
355357
}
356358
//if (nb_sm_reqs_ > 0 && ctx->thread_id == 8)
357359
// for (TransportConnection* current_conn : ctx->t_conns)
@@ -605,10 +607,15 @@ namespace rrr
605607
nb_polls = rte_ring_sc_dequeue_burst(ctx->poll_req_q, (void **)ctx->poll_reqs, 8, &available);
606608
for (int i = 0; i < nb_polls; i++)
607609
{
608-
ctx->poll_jobs.insert((Pollable *)(ctx->poll_reqs[i]));
610+
LOG_DEBUG("Client added poll job %p", ctx->poll_reqs[i]);
611+
while(ctx->poll_jobs.insert((Pollable *)(ctx->poll_reqs[i])).second == false);
612+
}
613+
614+
std::unordered_set<Pollable*>::iterator it = ctx->poll_jobs.begin();
615+
for(;it != ctx->poll_jobs.end(); ++it){
616+
LOG_DEBUG("RUNNNING POll JOB!!")
617+
(*it)->handle_read();
609618
}
610-
for (Pollable *poll : ctx->poll_jobs)
611-
poll->handle_read();
612619
}
613620

614621
}

0 commit comments

Comments
 (0)