Skip to content

Commit ddbe274

Browse files
authored
Merge pull request #10 from Asmod4n/master
small maintenance
2 parents e34e6b3 + 098a69b commit ddbe274

File tree

3 files changed

+55
-54
lines changed

3 files changed

+55
-54
lines changed

mrblib/proxy.rb

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,4 @@
11
module ZMQ
2-
class Proxy_fn
3-
def initialize(options)
4-
@background = ZMQ::Socket.new(options[:background][:type])
5-
@background.bind(options[:background][:endpoint])
6-
@foreground = ZMQ::Socket.new(options[:foreground][:type])
7-
@foreground.bind(options[:foreground][:endpoint])
8-
@control = ZMQ::Pair.new(options[:_control_endpoint])
9-
end
10-
11-
def run
12-
LibZMQ.proxy_steerable(@background, @foreground, @control)
13-
end
14-
end
15-
162
class Proxy
173
ENDPOINT = if LibZMQ.has? "ipc"
184
"ipc://*"
@@ -25,7 +11,7 @@ def initialize(options = {})
2511
@control = ZMQ::Pair.new(ENDPOINT, :bind)
2612
_options[:_control_endpoint] = @control.last_endpoint
2713
@thread = ZMQ::Thread.new
28-
@proxy = @thread.new(ZMQ::Proxy_fn, _options)
14+
@proxy = @thread.new(Proxy_fn, _options)
2915
@proxy.async(:run)
3016
end
3117

@@ -43,11 +29,24 @@ def terminate
4329
@control.send("TERMINATE")
4430
@control.close
4531
@thread.close
46-
remove_instance_variable(:@options)
4732
remove_instance_variable(:@control)
4833
remove_instance_variable(:@thread)
4934
remove_instance_variable(:@proxy)
5035
nil
5136
end
37+
38+
class Proxy_fn
39+
def initialize(options)
40+
@background = ZMQ::Socket.new(options[:background][:type])
41+
@background.bind(options[:background][:endpoint])
42+
@foreground = ZMQ::Socket.new(options[:foreground][:type])
43+
@foreground.bind(options[:foreground][:endpoint])
44+
@control = ZMQ::Pair.new(options[:_control_endpoint])
45+
end
46+
47+
def run
48+
LibZMQ.proxy_steerable(@background, @foreground, @control)
49+
end
50+
end
5251
end
5352
end

src/mrb_libzmq.c

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ mrb_zmq_ctx_get(mrb_state *mrb, mrb_value self)
5757
{
5858
mrb_int option_name;
5959
mrb_get_args(mrb, "i", &option_name);
60-
assert(option_name >= INT_MIN && option_name <= INT_MAX);
60+
assert(option_name >= (int) INT_MIN && option_name <= (int) INT_MAX);
6161

6262
int rc = zmq_ctx_get(MRB_LIBZMQ_CONTEXT(mrb), option_name);
6363
if (unlikely(rc == -1)) {
@@ -72,8 +72,8 @@ mrb_zmq_ctx_set(mrb_state *mrb, mrb_value self)
7272
{
7373
mrb_int option_name, option_value;
7474
mrb_get_args(mrb, "ii", &option_name, &option_value);
75-
assert(option_name >= INT_MIN && option_name <= INT_MAX);
76-
assert(option_value >= INT_MIN && option_value <= INT_MAX);
75+
assert(option_name >= (int) INT_MIN && option_name <= (int) INT_MAX);
76+
assert(option_value >= (int) INT_MIN && option_value <= (int) INT_MAX);
7777

7878
int rc = zmq_ctx_set(MRB_LIBZMQ_CONTEXT(mrb), option_name, option_value);
7979
if (unlikely(rc == -1)) {
@@ -127,9 +127,8 @@ mrb_zmq_getsockopt(mrb_state *mrb, mrb_value self)
127127
mrb_get_args(mrb, "diC|i", &socket, &mrb_zmq_socket_type, &option_name, &option_type, &string_return_len);
128128
assert(string_return_len >= 0);
129129

130-
struct RClass* option_class = mrb_class_ptr(option_type);
131-
size_t option_len = 0;
132-
int rc = -1;
130+
size_t option_len;
131+
int rc;
133132

134133
if (option_name == ZMQ_FD) {
135134
SOCKET fd;
@@ -138,13 +137,13 @@ mrb_zmq_getsockopt(mrb_state *mrb, mrb_value self)
138137
if (unlikely(rc == -1)) {
139138
mrb_zmq_handle_error(mrb, "zmq_getsockopt");
140139
}
141-
if (MRB_INT_MAX < fd) {
142-
return mrb_float_value(mrb, fd);
143-
} else {
140+
if (POSFIXABLE(fd)) {
144141
return mrb_fixnum_value(fd);
142+
} else {
143+
return mrb_float_value(mrb, fd);
145144
}
146145
}
147-
else if (option_class == mrb->true_class||option_class == mrb->false_class) {
146+
else if (mrb_class_ptr(option_type) == mrb->true_class||mrb_class_ptr(option_type) == mrb->false_class) {
148147
int boolean;
149148
option_len = sizeof(boolean);
150149
rc = zmq_getsockopt(socket, option_name, &boolean, &option_len);
@@ -153,7 +152,7 @@ mrb_zmq_getsockopt(mrb_state *mrb, mrb_value self)
153152
}
154153
return mrb_bool_value(boolean);
155154
}
156-
else if (option_class == mrb->fixnum_class) {
155+
else if (mrb_class_ptr(option_type) == mrb->fixnum_class) {
157156
int number;
158157
option_len = sizeof(number);
159158
rc = zmq_getsockopt(socket, option_name, &number, &option_len);
@@ -162,7 +161,7 @@ mrb_zmq_getsockopt(mrb_state *mrb, mrb_value self)
162161
}
163162
return mrb_fixnum_value(number);
164163
}
165-
else if (option_class == mrb->float_class) {
164+
else if (mrb_class_ptr(option_type) == mrb->float_class) {
166165
int64_t number;
167166
option_len = sizeof(number);
168167
rc = zmq_getsockopt(socket, option_name, &number, &option_len);
@@ -175,7 +174,7 @@ mrb_zmq_getsockopt(mrb_state *mrb, mrb_value self)
175174
return mrb_float_value(mrb, number);
176175
#endif
177176
}
178-
else if (option_class == mrb->string_class) {
177+
else if (mrb_class_ptr(option_type) == mrb->string_class) {
179178
mrb_value buf = mrb_str_new(mrb, NULL, string_return_len);
180179
option_len = RSTRING_CAPA(buf);
181180
rc = zmq_getsockopt(socket, option_name, RSTRING_PTR(buf), &option_len);
@@ -248,7 +247,7 @@ mrb_zmq_msg_send(mrb_state *mrb, mrb_value self)
248247
void *socket;
249248
mrb_int flags;
250249
mrb_get_args(mrb, "ddi", &msg, &mrb_zmq_msg_type, &socket, &mrb_zmq_socket_type, &flags);
251-
assert(flags >= INT_MIN && flags <= INT_MAX);
250+
assert(flags >= (int) INT_MIN && flags <= (int) INT_MAX);
252251

253252
int rc = zmq_msg_send(msg, socket, flags);
254253
if (unlikely(rc == -1)) {
@@ -265,7 +264,7 @@ mrb_zmq_msg_size(mrb_state *mrb, mrb_value self)
265264
mrb_get_args(mrb, "d", &msg, &mrb_zmq_msg_type);
266265

267266
size_t size = zmq_msg_size(msg);
268-
if (FIXABLE(size))
267+
if (POSFIXABLE(size))
269268
return mrb_fixnum_value(size);
270269
return mrb_float_value(mrb, size);
271270
}
@@ -329,7 +328,7 @@ mrb_zmq_send(mrb_state *mrb, mrb_value self)
329328
mrb_value message;
330329
mrb_int flags;
331330
mrb_get_args(mrb, "doi", &socket, &mrb_zmq_socket_type, &message, &flags);
332-
assert(flags >= INT_MIN && flags <= INT_MAX);
331+
assert(flags >= (int) INT_MIN && flags <= (int) INT_MAX);
333332

334333
message = mrb_str_to_str(mrb, message);
335334

@@ -348,7 +347,7 @@ mrb_zmq_setsockopt(mrb_state *mrb, mrb_value self)
348347
mrb_int option_name;
349348
mrb_value option_value;
350349
mrb_get_args(mrb, "dio", &socket, &mrb_zmq_socket_type, &option_name, &option_value);
351-
assert(option_name >= INT_MIN && option_name <= INT_MAX);
350+
assert(option_name >= (int) INT_MIN && option_name <= (int) INT_MAX);
352351

353352
int rc;
354353

@@ -366,12 +365,12 @@ mrb_zmq_setsockopt(mrb_state *mrb, mrb_value self)
366365
rc = zmq_setsockopt(socket, option_name, &boolean, sizeof(boolean));
367366
} break;
368367
case MRB_TT_FIXNUM: {
369-
assert(mrb_fixnum(option_value) >= INT_MIN && mrb_fixnum(option_value) <= INT_MAX);
368+
assert(mrb_fixnum(option_value) >= (int) INT_MIN && mrb_fixnum(option_value) <= (int) INT_MAX);
370369
int number = (int) mrb_fixnum(option_value);
371370
rc = zmq_setsockopt(socket, option_name, &number, sizeof(number));
372371
} break;
373372
case MRB_TT_FLOAT: {
374-
assert(mrb_float(option_value) >= INT64_MIN && mrb_float(option_value) <= INT64_MAX);
373+
assert(mrb_float(option_value) >= (int64_t) INT64_MIN && mrb_float(option_value) <= (int64_t) INT64_MAX);
375374
int64_t number = (int64_t) mrb_float(option_value);
376375
rc = zmq_setsockopt(socket, option_name, &number, sizeof(number));
377376
} break;
@@ -508,7 +507,7 @@ mrb_zmq_socket(mrb_state *mrb, mrb_value self)
508507

509508
mrb_int type;
510509
mrb_get_args(mrb, "i", &type);
511-
assert(type >= 0 && type <= INT_MAX);
510+
assert(type >= 0 && type <= (int) INT_MAX);
512511

513512
void *socket = zmq_socket(MRB_LIBZMQ_CONTEXT(mrb), type);
514513
if (likely(socket)) {
@@ -527,7 +526,7 @@ mrb_zmq_socket_monitor(mrb_state *mrb, mrb_value self)
527526
char *addr;
528527
mrb_int events;
529528
mrb_get_args(mrb, "dzi", &socket, &mrb_zmq_socket_type, &addr, &events);
530-
assert(events >= 0 && events <= INT_MAX);
529+
assert(events >= 0 && events <= (int) INT_MAX);
531530

532531
int rc = zmq_socket_monitor(socket, addr, events);
533532
if (unlikely(rc == -1)) {
@@ -542,11 +541,11 @@ mrb_zmq_socket_recv(mrb_state *mrb, mrb_value self)
542541
{
543542
mrb_int flags = 0;
544543
mrb_get_args(mrb, "|i", &flags);
545-
assert(flags >= INT_MIN && flags <= INT_MAX);
544+
assert(flags >= (int) INT_MIN && flags <= (int) INT_MAX);
546545

547546
mrb_value data = mrb_nil_value();
548547

549-
int more = 0;
548+
int more;
550549
struct RClass *zmq_msg_class = mrb_class_get_under(mrb, mrb_module_get(mrb, "ZMQ"), "Msg");
551550

552551
do {
@@ -840,7 +839,7 @@ mrb_zmq_poller_add(mrb_state *mrb, mrb_value self)
840839
int rc;
841840
if (mrb_obj_respond_to(mrb, socket_class, mrb_intern_lit(mrb, "to_i"))) {
842841
mrb_int fd = mrb_fixnum(mrb_Integer(mrb, socket));
843-
assert(fd >= INT_MIN&&fd <= INT_MAX);
842+
assert(fd >= (int) INT_MIN&&fd <= (int) INT_MAX);
844843
rc = zmq_poller_add_fd(DATA_PTR(self), fd, mrb_ptr(socket), events);
845844
if (unlikely(rc == -1)) {
846845
mrb_zmq_handle_error(mrb, "zmq_poller_add_fd");
@@ -872,7 +871,7 @@ mrb_zmq_poller_modify(mrb_state *mrb, mrb_value self)
872871
int rc;
873872
if (mrb_obj_respond_to(mrb, socket_class, mrb_intern_lit(mrb, "to_i"))) {
874873
mrb_int fd = mrb_fixnum(mrb_Integer(mrb, socket));
875-
assert(fd >= INT_MIN&&fd <= INT_MAX);
874+
assert(fd >= (int) INT_MIN&&fd <= (int) INT_MAX);
876875
rc = zmq_poller_modify_fd(DATA_PTR(self), fd, events);
877876
if (unlikely(rc == -1)) {
878877
mrb_zmq_handle_error(mrb, "zmq_poller_modify_fd");
@@ -900,7 +899,7 @@ mrb_zmq_poller_remove(mrb_state *mrb, mrb_value self)
900899
int rc;
901900
if (mrb_obj_respond_to(mrb, socket_class, mrb_intern_lit(mrb, "to_i"))) {
902901
mrb_int fd = mrb_fixnum(mrb_Integer(mrb, socket));
903-
assert(fd >= INT_MIN&&fd <= INT_MAX);
902+
assert(fd >= (int) INT_MIN&&fd <= (int) INT_MAX);
904903
rc = zmq_poller_remove_fd(DATA_PTR(self), fd);
905904
if (unlikely(rc == -1)) {
906905
mrb_zmq_handle_error(mrb, "zmq_poller_remove_fd");

src/mrb_libzmq.h

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ typedef struct {
110110
static void
111111
mrb_zmq_gc_threadclose(mrb_state *mrb, void *mrb_zmq_thread_data_)
112112
{
113-
if (likely(mrb_zmq_thread_data_)) {
113+
if (mrb_zmq_thread_data_) {
114114
mrb_zmq_thread_data_t *mrb_zmq_thread_data = (mrb_zmq_thread_data_t *) mrb_zmq_thread_data_;
115115
if (likely(mrb_zmq_thread_data->frontend)) {
116-
int disable = 0; // The other side might have crashed, don't wait on sending ther term msg.
116+
int disable = 0; // The other side might have crashed, don't wait on sending the term msg.
117117
zmq_setsockopt(mrb_zmq_thread_data->frontend, ZMQ_SNDTIMEO, &disable, sizeof(disable));
118118
zmq_send_const(mrb_zmq_thread_data->frontend, "TERM$", 5, 0);
119119
}
@@ -172,7 +172,7 @@ static int
172172
#else
173173
static void
174174
#endif
175-
mrb_zmq_thread_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *target_module)
175+
mrb_zmq_thread_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *thread_class)
176176
{
177177
/* filter dead objects */
178178
if (mrb_object_dead_p(mrb, obj)) {
@@ -205,8 +205,8 @@ mrb_zmq_thread_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *target_
205205
#endif
206206
}
207207

208-
if (mrb_obj_is_kind_of(mrb, mrb_obj_value(obj), (struct RClass *)target_module)) {
209-
mrb_value thread_val = mrb_obj_value(obj);
208+
mrb_value thread_val = mrb_obj_value(obj);
209+
if (mrb_obj_is_kind_of(mrb, thread_val, (struct RClass *)thread_class)) {
210210
mrb_zmq_gc_threadclose(mrb, DATA_PTR(thread_val));
211211
mrb_iv_remove(mrb, thread_val, mrb_intern_lit(mrb, "@pipe"));
212212
mrb_data_init(thread_val, NULL, NULL);
@@ -222,7 +222,7 @@ static int
222222
#else
223223
static void
224224
#endif
225-
mrb_zmq_zmq_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *target_module)
225+
mrb_zmq_zmq_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *socket_class)
226226
{
227227
/* filter dead objects */
228228
if (mrb_object_dead_p(mrb, obj)) {
@@ -255,12 +255,15 @@ mrb_zmq_zmq_close_gem_final(mrb_state *mrb, struct RBasic *obj, void *target_mod
255255
#endif
256256
}
257257

258-
if (mrb_obj_is_kind_of(mrb, mrb_obj_value(obj), (struct RClass *)target_module)) {
259-
mrb_value socket_val = mrb_obj_value(obj);
260-
int wait500ms = 500; // we wait 500 miliseconds for each socket to close.
261-
zmq_setsockopt(DATA_PTR(socket_val), ZMQ_LINGER, &wait500ms, sizeof(wait500ms));
262-
zmq_close(DATA_PTR(socket_val));
263-
mrb_data_init(socket_val, NULL, NULL);
258+
mrb_value socket_val = mrb_obj_value(obj);
259+
if (mrb_obj_is_kind_of(mrb, socket_val, (struct RClass *)socket_class)) {
260+
void *socket = DATA_PTR(socket_val);
261+
if (socket) {
262+
int wait500ms = 500; // we wait up to 500 miliseconds for each socket to close when mruby is closed via mrb_close(mrb).
263+
zmq_setsockopt(socket, ZMQ_LINGER, &wait500ms, sizeof(wait500ms));
264+
zmq_close(socket);
265+
mrb_data_init(socket_val, NULL, NULL);
266+
}
264267
}
265268
#ifdef MRB_EACH_OBJ_OK
266269
return MRB_EACH_OBJ_OK;

0 commit comments

Comments
 (0)