Skip to content

Commit 7f1fa52

Browse files
committed
Support keep-alive connections for HTTP (REST) channel
1 parent df430a0 commit 7f1fa52

File tree

3 files changed

+137
-74
lines changed

3 files changed

+137
-74
lines changed

include/mg_rpc_channel_http.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
#ifndef CS_MOS_LIBS_RPC_COMMON_SRC_MG_RPC_MG_RPC_CHANNEL_HTTP_H_
19-
#define CS_MOS_LIBS_RPC_COMMON_SRC_MG_RPC_MG_RPC_CHANNEL_HTTP_H_
18+
#pragma once
2019

2120
#include "mg_rpc_channel.h"
2221

@@ -38,7 +37,8 @@ extern "C" {
3837
*/
3938
struct mg_rpc_channel *mg_rpc_channel_http(struct mg_connection *nc,
4039
const char *default_auth_domain,
41-
const char *default_auth_file);
40+
const char *default_auth_file,
41+
bool *is_new);
4242

4343
/*
4444
* Should be called by the http endpoint handler, on the event
@@ -59,5 +59,3 @@ void mg_rpc_channel_http_recd_parsed_frame(struct mg_connection *nc,
5959
#endif
6060

6161
#endif /* defined(MGOS_HAVE_HTTP_SERVER) && MGOS_ENABLE_RPC_CHANNEL_HTTP */
62-
63-
#endif /* CS_MOS_LIBS_RPC_COMMON_SRC_MG_RPC_MG_RPC_CHANNEL_HTTP_H_ */

src/mg_rpc_channel_http.c

Lines changed: 125 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,38 @@
1818
#if defined(MGOS_HAVE_HTTP_SERVER) && MGOS_ENABLE_RPC_CHANNEL_HTTP
1919

2020
#include "mg_rpc_channel_http.h"
21+
2122
#include "mg_rpc.h"
2223
#include "mg_rpc_channel.h"
2324
#include "mg_rpc_channel_tcp_common.h"
2425

2526
#include "common/cs_dbg.h"
27+
#include "common/queue.h"
2628
#include "frozen.h"
2729

2830
#include "mgos_hal.h"
2931

3032
static const char *s_headers =
3133
"Content-Type: application/json\r\n"
3234
"Access-Control-Allow-Origin: *\r\n"
33-
"Access-Control-Allow-Headers: *\r\n"
34-
"Connection: close\r\n";
35+
"Access-Control-Allow-Headers: *";
3536

3637
struct mg_rpc_channel_http_data {
3738
struct mg_mgr *mgr;
3839
struct mg_connection *nc;
3940
struct http_message *hm;
41+
struct mg_rpc_channel *ch;
4042
const char *default_auth_domain;
4143
const char *default_auth_file;
44+
bool is_keep_alive;
4245
bool is_rest;
46+
bool is_open;
47+
SLIST_ENTRY(mg_rpc_channel_http_data) next;
4348
};
4449

50+
SLIST_HEAD(s_http_chd, mg_rpc_channel_http_data)
51+
s_http_chd = SLIST_HEAD_INITIALIZER(&s_http_chd);
52+
4553
static void ch_closed(void *arg) {
4654
struct mg_rpc_channel *ch = (struct mg_rpc_channel *) arg;
4755
ch->ev_handler(ch, MG_RPC_CHANNEL_CLOSED, NULL);
@@ -55,7 +63,7 @@ static bool nc_is_valid(struct mg_rpc_channel *ch) {
5563
(struct mg_rpc_channel_http_data *) ch->channel_data;
5664
if (chd->nc == NULL) return false;
5765
for (c = mg_next(chd->mgr, NULL); c != NULL; c = mg_next(chd->mgr, c)) {
58-
if (c == chd->nc) return true;
66+
if (c == chd->nc && !(c->flags & MG_F_CLOSE_IMMEDIATELY)) return true;
5967
}
6068
chd->nc = NULL;
6169
mgos_invoke_cb(ch_closed, ch, false /* from_isr */);
@@ -153,62 +161,67 @@ static const char *mg_rpc_channel_http_get_type(struct mg_rpc_channel *ch) {
153161
}
154162

155163
static char *mg_rpc_channel_http_get_info(struct mg_rpc_channel *ch) {
156-
struct mg_rpc_channel_http_data *chd =
157-
(struct mg_rpc_channel_http_data *) ch->channel_data;
164+
struct mg_rpc_channel_http_data *chd = ch->channel_data;
158165
return (nc_is_valid(ch) ? mg_rpc_channel_tcp_get_info(chd->nc) : NULL);
159166
}
160167

161168
/*
162169
* Timer callback which emits SENT and CLOSED events to mg_rpc.
163170
*/
164171
static void mg_rpc_channel_http_frame_sent(void *param) {
165-
struct mg_rpc_channel *ch = (struct mg_rpc_channel *) param;
172+
struct mg_rpc_channel *ch = param;
173+
struct mg_rpc_channel_http_data *chd =
174+
(struct mg_rpc_channel_http_data *) ch->channel_data;
166175
ch->ev_handler(ch, MG_RPC_CHANNEL_FRAME_SENT, (void *) 1);
167-
ch->ev_handler(ch, MG_RPC_CHANNEL_CLOSED, NULL);
176+
if (!chd->is_keep_alive) {
177+
ch->ev_handler(ch, MG_RPC_CHANNEL_CLOSED, NULL);
178+
}
168179
}
169180

170181
static void mg_rpc_channel_http_ch_destroy(struct mg_rpc_channel *ch) {
171-
free(ch->channel_data);
182+
struct mg_rpc_channel_http_data *chd =
183+
(struct mg_rpc_channel_http_data *) ch->channel_data;
184+
SLIST_REMOVE(&s_http_chd, chd, mg_rpc_channel_http_data, next);
185+
free(chd);
172186
free(ch);
173187
}
174188

189+
extern const char *mg_status_message(int status_code);
190+
175191
static bool mg_rpc_channel_http_send_frame(struct mg_rpc_channel *ch,
176192
const struct mg_str f) {
177193
struct mg_rpc_channel_http_data *chd =
178194
(struct mg_rpc_channel_http_data *) ch->channel_data;
179195
if (!nc_is_valid(ch)) {
180196
return false;
181197
}
198+
struct mg_connection *nc = chd->nc;
182199

200+
int code = 200;
201+
struct mg_str body;
183202
if (chd->is_rest) {
184203
struct json_token result_tok = JSON_INVALID_TOKEN;
185-
int error_code = 0;
186-
char *error_msg = NULL;
187-
json_scanf(f.p, f.len, "{result: %T, error: {code: %d, message: %Q}}",
188-
&result_tok, &error_code, &error_msg);
189-
190-
if (result_tok.type != JSON_TYPE_INVALID) {
191-
/* Got some result */
192-
mg_send_response_line(chd->nc, 200, s_headers);
193-
mg_printf(chd->nc, "%.*s\r\n", (int) result_tok.len, result_tok.ptr);
194-
} else if (error_code != 0) {
195-
if (error_code != 404) error_code = 500;
196-
/* Got some error */
197-
mg_http_send_error(chd->nc, error_code, error_msg);
204+
struct json_token error_tok = JSON_INVALID_TOKEN;
205+
json_scanf(f.p, f.len, "{result: %T, error: %T}", &result_tok, &error_tok);
206+
if (error_tok.len != 0) {
207+
code = 500;
208+
body = mg_mk_str_n(error_tok.ptr, error_tok.len);
198209
} else {
199-
/* Empty result - that is legal. */
200-
mg_send_response_line(chd->nc, 200, s_headers);
201-
}
202-
if (error_msg != NULL) {
203-
free(error_msg);
210+
body = mg_mk_str_n(result_tok.ptr, result_tok.len);
204211
}
205212
} else {
206-
mg_send_response_line(chd->nc, 200, s_headers);
207-
mg_printf(chd->nc, "%.*s\r\n", (int) f.len, f.p);
213+
body = f;
214+
}
215+
mg_send_response_line(nc, code, s_headers);
216+
mg_printf(nc, "Content-Length: %d\r\n", (int) body.len + 2);
217+
mg_printf(nc, "Connection: %s\r\n",
218+
(chd->is_keep_alive ? "keep-alive" : "close"));
219+
mg_printf(nc, "\r\n%.*s\r\n", (int) body.len, body.p);
220+
221+
if (!chd->is_keep_alive) {
222+
nc->flags |= MG_F_SEND_AND_CLOSE;
223+
chd->nc = NULL;
208224
}
209-
210-
chd->nc->flags |= MG_F_SEND_AND_CLOSE;
211-
chd->nc = NULL;
212225

213226
/*
214227
* Schedule a callback which will emit SENT and CLOSED events. mg_rpc expects
@@ -220,40 +233,9 @@ static bool mg_rpc_channel_http_send_frame(struct mg_rpc_channel *ch,
220233
return true;
221234
}
222235

223-
struct mg_rpc_channel *mg_rpc_channel_http(struct mg_connection *nc,
224-
const char *default_auth_domain,
225-
const char *default_auth_file) {
226-
struct mg_rpc_channel *ch = (struct mg_rpc_channel *) calloc(1, sizeof(*ch));
227-
ch->ch_connect = mg_rpc_channel_http_ch_connect;
228-
ch->send_frame = mg_rpc_channel_http_send_frame;
229-
ch->ch_close = mg_rpc_channel_http_ch_close;
230-
ch->ch_destroy = mg_rpc_channel_http_ch_destroy;
231-
ch->get_type = mg_rpc_channel_http_get_type;
232-
/*
233-
* New channel is created for each incoming HTTP request, so the channel
234-
* is not persistent.
235-
*
236-
* Rationale for this behaviour, instead of updating channel's destination on
237-
* each incoming frame, is that this won't work with asynchronous responses.
238-
*/
239-
ch->is_persistent = mg_rpc_channel_false;
240-
/*
241-
* HTTP channel expects exactly one response.
242-
* We don't want random broadcasts to be sent as a response.
243-
*/
244-
ch->is_broadcast_enabled = mg_rpc_channel_false;
245-
ch->get_authn_info = mg_rpc_channel_http_get_authn_info;
246-
ch->send_not_authorized = mg_rpc_channel_http_send_not_authorized;
247-
ch->get_info = mg_rpc_channel_http_get_info;
248-
249-
struct mg_rpc_channel_http_data *chd =
250-
(struct mg_rpc_channel_http_data *) calloc(1, sizeof(*chd));
251-
chd->default_auth_domain = default_auth_domain;
252-
chd->default_auth_file = default_auth_file;
253-
chd->mgr = nc->mgr;
254-
ch->channel_data = chd;
255-
nc->user_data = ch;
256-
return ch;
236+
static bool is_keepalive(struct http_message *hm) {
237+
struct mg_str *conn_hdr = mg_get_http_header(hm, "Connection");
238+
return (conn_hdr != NULL && mg_vcasecmp(conn_hdr, "keep-alive") == 0);
257239
}
258240

259241
void mg_rpc_channel_http_recd_frame(struct mg_connection *nc,
@@ -264,7 +246,13 @@ void mg_rpc_channel_http_recd_frame(struct mg_connection *nc,
264246
(struct mg_rpc_channel_http_data *) ch->channel_data;
265247
chd->nc = nc;
266248
chd->hm = hm;
267-
ch->ev_handler(ch, MG_RPC_CHANNEL_OPEN, NULL);
249+
chd->is_rest = false;
250+
chd->is_keep_alive = is_keepalive(hm);
251+
252+
if (!chd->is_open) {
253+
chd->is_open = true;
254+
ch->ev_handler(ch, MG_RPC_CHANNEL_OPEN, NULL);
255+
}
268256
ch->ev_handler(ch, MG_RPC_CHANNEL_FRAME_RECD, (void *) &frame);
269257
}
270258

@@ -278,6 +266,18 @@ void mg_rpc_channel_http_recd_parsed_frame(struct mg_connection *nc,
278266
chd->nc = nc;
279267
chd->hm = hm;
280268
chd->is_rest = true;
269+
chd->is_keep_alive = is_keepalive(hm);
270+
271+
if (mg_vcasecmp(&hm->method, "OPTIONS") == 0) {
272+
// CORS check.
273+
mg_send_response_line(chd->nc, 200, s_headers);
274+
mg_printf(nc, "Content-Length: %d\r\n", 0);
275+
mg_printf(nc, "Connection: %s\r\n",
276+
(chd->is_keep_alive ? "keep-alive" : "close"));
277+
mg_printf(nc, "\r\n");
278+
if (!chd->is_keep_alive) chd->nc->flags |= MG_F_SEND_AND_CLOSE;
279+
return;
280+
}
281281

282282
/* Prepare "parsed" frame */
283283
struct mg_rpc_frame frame;
@@ -289,8 +289,66 @@ void mg_rpc_channel_http_recd_parsed_frame(struct mg_connection *nc,
289289
frame.id = mg_mk_str(ids);
290290

291291
/* "Open" the channel and send the frame */
292-
ch->ev_handler(ch, MG_RPC_CHANNEL_OPEN, NULL);
292+
if (!chd->is_open) {
293+
chd->is_open = true;
294+
ch->ev_handler(ch, MG_RPC_CHANNEL_OPEN, NULL);
295+
}
296+
293297
ch->ev_handler(ch, MG_RPC_CHANNEL_FRAME_RECD_PARSED, &frame);
294298
}
295299

300+
struct mg_rpc_channel *mg_rpc_channel_http(struct mg_connection *nc,
301+
const char *default_auth_domain,
302+
const char *default_auth_file,
303+
bool *is_new) {
304+
struct mg_rpc_channel *ch = NULL;
305+
struct mg_rpc_channel_http_data *chd, *chdt;
306+
SLIST_FOREACH_SAFE(chd, &s_http_chd, next, chdt) {
307+
if (chd->nc == nc) {
308+
ch = chd->ch;
309+
} else {
310+
// Close channels for which connections may have been closed.
311+
nc_is_valid(chd->ch);
312+
}
313+
}
314+
315+
if (ch == NULL) {
316+
ch = (struct mg_rpc_channel *) calloc(1, sizeof(*ch));
317+
if (ch == NULL) return NULL;
318+
*is_new = true;
319+
} else {
320+
*is_new = false;
321+
}
322+
323+
ch->ch_connect = mg_rpc_channel_http_ch_connect;
324+
ch->send_frame = mg_rpc_channel_http_send_frame;
325+
ch->ch_close = mg_rpc_channel_http_ch_close;
326+
ch->ch_destroy = mg_rpc_channel_http_ch_destroy;
327+
ch->get_type = mg_rpc_channel_http_get_type;
328+
ch->is_persistent = mg_rpc_channel_false;
329+
// No broadcasts here, it's a request-response channel.
330+
ch->is_broadcast_enabled = mg_rpc_channel_false;
331+
ch->get_authn_info = mg_rpc_channel_http_get_authn_info;
332+
ch->send_not_authorized = mg_rpc_channel_http_send_not_authorized;
333+
ch->get_info = mg_rpc_channel_http_get_info;
334+
335+
if (*is_new) {
336+
chd = (struct mg_rpc_channel_http_data *) calloc(1, sizeof(*chd));
337+
} else {
338+
chd = ch->channel_data;
339+
}
340+
341+
chd->default_auth_domain = default_auth_domain;
342+
chd->default_auth_file = default_auth_file;
343+
chd->mgr = nc->mgr;
344+
ch->channel_data = chd;
345+
chd->ch = ch;
346+
347+
if (*is_new) {
348+
SLIST_INSERT_HEAD(&s_http_chd, chd, next);
349+
}
350+
351+
return ch;
352+
}
353+
296354
#endif /* defined(MGOS_HAVE_HTTP_SERVER) && MGOS_ENABLE_RPC_CHANNEL_HTTP */

src/mgos_rpc.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,19 @@ static void mgos_rpc_http_handler(struct mg_connection *nc, int ev,
7575
void *ev_data, void *user_data) {
7676
if (ev == MG_EV_HTTP_REQUEST) {
7777
/* Create and add the channel to mg_rpc */
78+
bool is_new = false;
7879
struct mg_rpc_channel *ch =
7980
mg_rpc_channel_http(nc, mgos_sys_config_get_http_auth_domain(),
80-
mgos_sys_config_get_http_auth_file());
81+
mgos_sys_config_get_http_auth_file(), &is_new);
8182
struct http_message *hm = (struct http_message *) ev_data;
8283
size_t prefix_len = sizeof(HTTP_URI_PREFIX) - 1;
83-
mg_rpc_add_channel(mgos_rpc_get_global(), mg_mk_str(""), ch);
84+
if (ch == NULL) {
85+
mg_http_send_error(nc, 500, "Failed to create channel");
86+
return;
87+
}
88+
if (is_new) {
89+
mg_rpc_add_channel(mgos_rpc_get_global(), mg_mk_str(""), ch);
90+
}
8491

8592
/*
8693
* Handle the request. If there is method name after /rpc,

0 commit comments

Comments
 (0)