3 static VALUE ENC
; /* LWES_ENCODING */
4 static ID id_TYPE_DB
, id_TYPE_LIST
, id_NAME
, id_HAVE_ENCODING
;
5 static ID id_new
, id_enc
, id_size
;
8 static void dump_name(VALUE name
, LWES_BYTE_P buf
, size_t *off
)
10 char *s
= RSTRING_PTR(name
);
12 if (marshall_SHORT_STRING(s
, buf
, MAX_MSG_SIZE
, off
) > 0)
14 rb_raise(rb_eRuntimeError
, "failed to dump name=%s", s
);
17 static int dump_bool(VALUE name
, VALUE val
, LWES_BYTE_P buf
, size_t *off
)
19 LWES_BOOLEAN tmp
= FALSE
;
23 } else if (val
!= Qfalse
) {
24 volatile VALUE raise_inspect
;
26 rb_raise(rb_eTypeError
, "non-boolean set for %s: %s",
30 dump_name(name
, buf
, off
);
31 lwesrb_dump_type(LWES_BOOLEAN_TOKEN
, buf
, off
);
32 return marshall_BOOLEAN(tmp
, buf
, MAX_MSG_SIZE
, off
);
35 static int dump_string(VALUE name
, VALUE val
, LWES_BYTE_P buf
, size_t *off
)
37 char *dst
= StringValuePtr(val
);
39 dump_name(name
, buf
, off
);
40 lwesrb_dump_type(LWES_STRING_TOKEN
, buf
, off
);
41 return marshall_LONG_STRING(dst
, buf
, MAX_MSG_SIZE
, off
);
44 static void dump_enc(VALUE enc
, LWES_BYTE_P buf
, size_t *off
)
46 dump_name(ENC
, buf
, off
);
47 lwesrb_dump_num(LWES_INT_16_TOKEN
, enc
, buf
, off
);
50 static char *my_strdup(const char *str
)
52 long len
= strlen(str
) + 1;
53 char *rv
= xmalloc(len
);
60 /* the underlying struct for LWES::Emitter */
61 struct _rb_lwes_emitter
{
62 struct lwes_emitter
*emitter
;
66 LWES_BOOLEAN emit_heartbeat
;
71 /* gets the _rb_lwes_emitter struct pointer from self */
72 static struct _rb_lwes_emitter
* _rle(VALUE self
)
74 struct _rb_lwes_emitter
*rle
;
76 Data_Get_Struct(self
, struct _rb_lwes_emitter
, rle
);
81 /* GC automatically calls this when object is finalized */
82 static void rle_free(void *ptr
)
84 struct _rb_lwes_emitter
*rle
= ptr
;
87 lwes_emitter_destroy(rle
->emitter
);
93 /* called by the GC when object is allocated */
94 static VALUE
rle_alloc(VALUE klass
)
96 struct _rb_lwes_emitter
*rle
;
98 return Data_Make_Struct(klass
, struct _rb_lwes_emitter
,
105 * key => [ numeric_type, Numeric ],
108 * memo - lwes_event pointer
110 static VALUE
event_hash_iter_i(VALUE kv
, VALUE memo
)
112 volatile VALUE raise_inspect
;
113 VALUE
*tmp
= (VALUE
*)memo
;
117 LWES_BYTE_P buf
= (LWES_BYTE_P
)tmp
[0];
118 size_t *off
= (size_t *)tmp
[1];
120 if (TYPE(kv
) != T_ARRAY
|| RARRAY_LEN(kv
) != 2)
121 rb_raise(rb_eTypeError
,
122 "hash iteration not giving key-value pairs");
123 tmp
= RARRAY_PTR(kv
);
126 if (name
== sym_enc
) return Qnil
; /* already dumped first */
128 name
= rb_obj_as_string(name
);
130 if (strcmp(RSTRING_PTR(name
), LWES_ENCODING
) == 0)
138 rv
= dump_bool(name
, val
, buf
, off
);
141 dump_name(name
, buf
, off
);
142 lwesrb_dump_num_ary(val
, buf
, off
);
145 rv
= dump_string(name
, val
, buf
, off
);
152 rb_raise(rb_eArgError
, "unhandled type %s=%s",
153 RSTRING_PTR(name
), RAISE_INSPECT(val
));
157 static VALUE
emit_hash(VALUE self
, VALUE name
, VALUE event
)
159 struct _rb_lwes_emitter
*rle
= _rle(self
);
160 LWES_BYTE_P buf
= rle
->emitter
->buffer
;
164 int size
= NUM2INT(rb_funcall(event
, id_size
, 0, 0));
168 tmp
[1] = (VALUE
)&off
;
170 if (size
< 0 || size
> UINT16_MAX
)
171 rb_raise(rb_eRangeError
, "hash size out of uint16 range");
173 /* event name first */
174 dump_name(name
, buf
, &off
);
176 /* number of attributes second */
177 rv
= marshall_U_INT_16((LWES_U_INT_16
)size
, buf
, MAX_MSG_SIZE
, &off
);
179 rb_raise(rb_eRuntimeError
, "failed to dump num_attrs");
181 /* dump encoding before other fields */
182 enc
= rb_hash_aref(event
, sym_enc
);
184 enc
= rb_hash_aref(event
, ENC
);
186 dump_enc(enc
, buf
, &off
);
188 /* the rest of the fields */
189 rb_iterate(rb_each
, event
, event_hash_iter_i
, (VALUE
)&tmp
);
191 if (lwes_emitter_emit_bytes(rle
->emitter
, buf
, off
) < 0)
192 rb_raise(rb_eRuntimeError
, "failed to emit event");
205 volatile VALUE raise_inspect
;
208 case LWES_TYPE_STRING
:
209 if (dump_string(name
, val
, buf
, off
) > 0)
212 case LWES_TYPE_BOOLEAN
:
213 if (dump_bool(name
, val
, buf
, off
) > 0)
217 dump_name(name
, buf
, off
);
218 lwesrb_dump_num(type
, val
, buf
, off
);
222 rb_raise(rb_eRuntimeError
, "failed to set %s=%s",
223 RSTRING_PTR(name
), RAISE_INSPECT(val
));
226 static void lwes_struct_class(
235 *event_class
= CLASS_OF(event
);
236 type_db
= rb_const_get(*event_class
, id_TYPE_DB
);
238 if (CLASS_OF(type_db
) != cLWES_TypeDB
)
239 rb_raise(rb_eArgError
, "class does not have valid TYPE_DB");
241 *name
= rb_const_get(*event_class
, id_NAME
);
242 Check_Type(*name
, T_STRING
);
243 *type_list
= rb_const_get(*event_class
, id_TYPE_LIST
);
244 Check_Type(*type_list
, T_ARRAY
);
246 *have_enc
= rb_const_get(*event_class
, id_HAVE_ENCODING
);
249 static VALUE
emit_struct(VALUE self
, VALUE event
)
251 VALUE event_class
, name
, type_list
, have_enc
;
252 struct _rb_lwes_emitter
*rle
= _rle(self
);
253 LWES_BYTE_P buf
= rle
->emitter
->buffer
;
257 LWES_U_INT_16 num_attr
= 0;
261 lwes_struct_class(&event_class
, &name
, &type_list
, &have_enc
, event
);
264 dump_name(name
, buf
, &off
);
266 /* number of attributes, use a placeholder until we've iterated */
268 if (marshall_U_INT_16(0, buf
, MAX_MSG_SIZE
, &off
) < 0)
269 rb_raise(rb_eRuntimeError
,
270 "failed to marshal number_of_attributes");
272 /* dump encoding before other fields */
273 if (have_enc
== Qtrue
) {
274 VALUE enc
= rb_funcall(event
, id_enc
, 0, 0);
277 dump_enc(enc
, buf
, &off
);
281 i
= RARRAY_LEN(type_list
);
282 flds
= RSTRUCT_PTR(event
);
283 tmp
= RARRAY_PTR(type_list
);
284 for (; --i
>= 0; tmp
++, flds
++) {
285 /* inner: [ :field_sym, "field_name", type ] */
286 VALUE
*inner
= RARRAY_PTR(*tmp
);
290 if (inner
[0] == sym_enc
) /* encoding was already dumped */
295 continue; /* LWES doesn't know nil */
298 type
= NUM2INT(inner
[2]);
300 marshal_field(name
, type
, val
, buf
, &off
);
303 /* now we've iterated, we can accurately give num_attr */
304 if (marshall_U_INT_16(num_attr
, buf
, MAX_MSG_SIZE
, &num_attr_off
) <= 0)
305 rb_raise(rb_eRuntimeError
, "failed to marshal num_attr");
307 if (lwes_emitter_emit_bytes(rle
->emitter
, buf
, off
) < 0)
308 rb_raise(rb_eRuntimeError
, "failed to emit event");
315 * emitter = LWES::Emitter.new
316 * event = EventStruct.new
320 static VALUE
emitter_ltlt(VALUE self
, VALUE event
)
322 Check_Type(event
, T_STRUCT
);
324 return emit_struct(self
, event
);
329 * emitter = LWES::Emitter.new
331 * emitter.emit("EventName", :foo => "HI")
333 * emitter.emit(EventStruct, :foo => "HI")
335 * struct = EventStruct.new
337 * emitter.emit(struct)
339 static VALUE
emitter_emit(int argc
, VALUE
*argv
, VALUE self
)
341 volatile VALUE raise_inspect
;
344 argc
= rb_scan_args(argc
, argv
, "11", &name
, &event
);
346 switch (TYPE(name
)) {
348 if (TYPE(event
) == T_HASH
)
349 return emit_hash(self
, name
, event
);
350 rb_raise(rb_eTypeError
,
351 "second argument must be a hash when first "
355 rb_raise(rb_eArgError
,
356 "second argument not allowed when first"
359 return emit_struct(self
, event
);
361 if (TYPE(event
) != T_HASH
)
362 rb_raise(rb_eTypeError
,
363 "second argument must be a Hash when first"
367 * we can optimize this so there's no intermediate
370 event
= rb_funcall(name
, id_new
, 1, event
);
371 return emit_struct(self
, event
);
373 rb_raise(rb_eArgError
,
374 "bad argument: %s, must be a String, Struct or Class",
375 RAISE_INSPECT(name
));
378 assert(0 && "should never get here");
383 * Destroys the associated lwes_emitter and the associated socket. This
384 * method is rarely needed as Ruby garbage collection will take care of
385 * closing for you, but may be useful in odd cases when it is desirable
386 * to release file descriptors ASAP.
388 static VALUE
emitter_close(VALUE self
)
390 struct _rb_lwes_emitter
*rle
= _rle(self
);
393 lwes_emitter_destroy(rle
->emitter
);
399 static void lwesrb_emitter_create(struct _rb_lwes_emitter
*rle
)
403 if (rle
->ttl
== UINT32_MAX
)
404 rle
->emitter
= lwes_emitter_create(
405 rle
->address
, rle
->iface
, rle
->port
,
406 rle
->emit_heartbeat
, rle
->freq
);
408 rle
->emitter
= lwes_emitter_create_with_ttl(
409 rle
->address
, rle
->iface
, rle
->port
,
410 rle
->emit_heartbeat
, rle
->freq
, rle
->ttl
);
413 if (--gc_retry
== 0) {
417 rb_raise(rb_eRuntimeError
, "failed to create LWES emitter");
422 static VALUE
init_copy(VALUE dest
, VALUE obj
)
424 struct _rb_lwes_emitter
*dst
= _rle(dest
);
425 struct _rb_lwes_emitter
*src
= _rle(obj
);
427 memcpy(dst
, src
, sizeof(*dst
));
428 dst
->address
= my_strdup(src
->address
);
430 dst
->iface
= my_strdup(src
->iface
);
431 lwesrb_emitter_create(dst
);
433 assert(dst
->emitter
&& dst
->emitter
!= src
->emitter
&&
434 "emitter not a copy");
439 /* should only used internally by #initialize */
440 static VALUE
_create(VALUE self
, VALUE options
)
442 struct _rb_lwes_emitter
*rle
= _rle(self
);
443 VALUE address
, iface
, port
, heartbeat
, ttl
;
445 rle
->emit_heartbeat
= FALSE
;
447 rle
->ttl
= UINT32_MAX
; /* nobody sets a ttl this long, right? */
450 rb_raise(rb_eRuntimeError
, "already created lwes_emitter");
451 if (TYPE(options
) != T_HASH
)
452 rb_raise(rb_eTypeError
, "options must be a hash");
454 address
= rb_hash_aref(options
, ID2SYM(rb_intern("address")));
455 if (TYPE(address
) != T_STRING
)
456 rb_raise(rb_eTypeError
, ":address must be a string");
457 rle
->address
= my_strdup(RSTRING_PTR(address
));
459 iface
= rb_hash_aref(options
, ID2SYM(rb_intern("iface")));
460 switch (TYPE(iface
)) {
465 rle
->iface
= my_strdup(RSTRING_PTR(iface
));
468 rb_raise(rb_eTypeError
, ":iface must be a String or nil");
471 port
= rb_hash_aref(options
, ID2SYM(rb_intern("port")));
472 if (TYPE(port
) != T_FIXNUM
)
473 rb_raise(rb_eTypeError
, ":port must be a Fixnum");
474 rle
->port
= NUM2UINT(port
);
476 heartbeat
= rb_hash_aref(options
, ID2SYM(rb_intern("heartbeat")));
477 if (TYPE(heartbeat
) == T_FIXNUM
) {
478 int tmp
= NUM2INT(heartbeat
);
480 rb_raise(rb_eArgError
,":heartbeat > INT16_MAX seconds");
481 rle
->emit_heartbeat
= TRUE
;
482 rle
->freq
= (LWES_INT_16
)tmp
;
483 } else if (NIL_P(heartbeat
)) { /* do nothing, use defaults */
485 rb_raise(rb_eTypeError
, ":heartbeat must be a Fixnum or nil");
487 ttl
= rb_hash_aref(options
, ID2SYM(rb_intern("ttl")));
488 if (TYPE(ttl
) == T_FIXNUM
) {
489 unsigned LONG_LONG tmp
= NUM2ULL(ttl
);
490 if (tmp
>= UINT32_MAX
)
491 rb_raise(rb_eArgError
, ":ttl >= UINT32_MAX seconds");
492 rle
->ttl
= (LWES_U_INT_32
)tmp
;
493 } else if (NIL_P(ttl
)) { /* do nothing, no ttl */
495 rb_raise(rb_eTypeError
, ":ttl must be a Fixnum or nil");
497 lwesrb_emitter_create(rle
);
502 /* Init_lwes_ext will call this */
503 void lwesrb_init_emitter(void)
505 VALUE mLWES
= rb_define_module("LWES");
506 VALUE cLWES_Emitter
=
507 rb_define_class_under(mLWES
, "Emitter", rb_cObject
);
509 rb_define_method(cLWES_Emitter
, "<<", emitter_ltlt
, 1);
510 rb_define_method(cLWES_Emitter
, "emit", emitter_emit
, -1);
511 rb_define_method(cLWES_Emitter
, "_create", _create
, 1);
512 rb_define_method(cLWES_Emitter
, "close", emitter_close
, 0);
513 rb_define_method(cLWES_Emitter
, "initialize_copy", init_copy
, 1);
514 rb_define_alloc_func(cLWES_Emitter
, rle_alloc
);
515 LWESRB_MKID(TYPE_DB
);
516 LWESRB_MKID(TYPE_LIST
);
518 LWESRB_MKID(HAVE_ENCODING
);
521 id_enc
= rb_intern(LWES_ENCODING
);
522 sym_enc
= ID2SYM(id_enc
);
524 ENC
= rb_obj_freeze(rb_str_new2(LWES_ENCODING
));
525 rb_define_const(mLWES
, "ENCODING", ENC
);