properly emit sparse LWES::Events from Ruby Hashes
[lwes-ruby.git] / ext / lwes_ext / emitter.c
blob2cfb3bb1cca651332382f32b5f1518ca57012fbd
1 #include "lwes_ruby.h"
2 #ifdef HAVE_RUBY_UTIL_H
3 # include <ruby/util.h>
4 #else
5 # include "util.h"
6 #endif
8 static VALUE ENC; /* LWES_ENCODING */
9 static ID id_TYPE_DB, id_TYPE_LIST, id_NAME, id_HAVE_ENCODING;
10 static ID id_new, id_enc, id_size;
11 static VALUE sym_enc;
13 static void dump_name(char *name, LWES_BYTE_P buf, size_t *off)
15 if (marshall_SHORT_STRING(name, buf, MAX_MSG_SIZE, off) > 0)
16 return;
17 rb_raise(rb_eRuntimeError, "failed to dump name=%s", name);
20 static int dump_bool(char *name, VALUE val, LWES_BYTE_P buf, size_t *off)
22 dump_name(name, buf, off);
23 lwesrb_dump_type(LWES_BOOLEAN_TOKEN, buf, off);
24 return marshall_BOOLEAN(lwesrb_boolean(val), buf, MAX_MSG_SIZE, off);
27 static int dump_string(char *name, VALUE val, LWES_BYTE_P buf, size_t *off)
29 char *dst;
31 switch (TYPE(val)) {
32 case T_BIGNUM:
33 case T_FIXNUM:
34 val = rb_obj_as_string(val);
36 dst = StringValueCStr(val);
38 dump_name(name, buf, off);
39 lwesrb_dump_type(LWES_STRING_TOKEN, buf, off);
40 return marshall_LONG_STRING(dst, buf, MAX_MSG_SIZE, off);
43 static void dump_enc(VALUE enc, LWES_BYTE_P buf, size_t *off)
45 dump_name((char *)LWES_ENCODING, buf, off);
46 lwesrb_dump_num(LWES_INT_16_TOKEN, enc, buf, off);
49 /* the underlying struct for LWES::Emitter */
50 struct _rb_lwes_emitter {
51 struct lwes_emitter *emitter;
52 char *address;
53 char *iface;
54 LWES_U_INT_32 port;
55 LWES_BOOLEAN emit_heartbeat;
56 LWES_INT_16 freq;
57 LWES_U_INT_32 ttl;
60 /* gets the _rb_lwes_emitter struct pointer from self */
61 static struct _rb_lwes_emitter * _rle(VALUE self)
63 struct _rb_lwes_emitter *rle;
65 Data_Get_Struct(self, struct _rb_lwes_emitter, rle);
67 return rle;
70 /* GC automatically calls this when object is finalized */
71 static void rle_free(void *ptr)
73 struct _rb_lwes_emitter *rle = ptr;
75 if (rle->emitter)
76 lwes_emitter_destroy(rle->emitter);
77 xfree(rle->address);
78 xfree(rle->iface);
79 xfree(ptr);
82 /* called by the GC when object is allocated */
83 static VALUE rle_alloc(VALUE klass)
85 struct _rb_lwes_emitter *rle;
87 return Data_Make_Struct(klass, struct _rb_lwes_emitter,
88 NULL, rle_free, rle);
91 struct hash_memo {
92 size_t off;
93 LWES_BYTE_P buf;
97 * kv - Array:
98 * key => String,
99 * key => [ numeric_type, Numeric ],
100 * key => true,
101 * key => false,
102 * memo - lwes_event pointer
104 static VALUE event_hash_iter_i(VALUE kv, VALUE memo)
106 volatile VALUE raise_inspect;
107 struct hash_memo *hash_memo = (struct hash_memo *)NUM2ULONG(memo);
108 VALUE val;
109 VALUE name;
110 char *attr_name;
111 int rv = 0;
112 LWES_BYTE_P buf = hash_memo->buf;
113 size_t *off = &hash_memo->off;
114 VALUE *tmp;
116 if (TYPE(kv) != T_ARRAY || RARRAY_LEN(kv) != 2)
117 rb_raise(rb_eTypeError,
118 "hash iteration not giving key-value pairs");
119 tmp = RARRAY_PTR(kv);
120 name = tmp[0];
122 if (name == sym_enc) return Qnil; /* already dumped first */
124 name = rb_obj_as_string(name);
125 attr_name = StringValueCStr(name);
127 if (strcmp(attr_name, LWES_ENCODING) == 0)
128 return Qnil;
130 val = tmp[1];
132 switch (TYPE(val)) {
133 case T_TRUE:
134 case T_FALSE:
135 rv = dump_bool(attr_name, val, buf, off);
136 break;
137 case T_ARRAY:
138 dump_name(attr_name, buf, off);
139 lwesrb_dump_num_ary(val, buf, off);
140 return Qnil;
141 case T_STRING:
142 rv = dump_string(attr_name, val, buf, off);
143 break;
146 if (rv > 0)
147 return Qnil;
149 rb_raise(rb_eArgError, "unhandled type %s=%s",
150 attr_name, RAISE_INSPECT(val));
151 return Qfalse;
154 static VALUE emit_hash(VALUE self, VALUE name, VALUE event)
156 struct _rb_lwes_emitter *rle = _rle(self);
157 struct hash_memo hash_memo;
158 LWES_BYTE_P buf;
159 size_t *off;
160 VALUE memo = ULONG2NUM((unsigned long)&hash_memo);
161 VALUE enc;
162 LWES_U_INT_16 size = lwesrb_uint16(rb_funcall(event, id_size, 0, 0));
163 int rv;
164 char *event_name = StringValueCStr(name);
166 buf = hash_memo.buf = rle->emitter->buffer;
167 hash_memo.off = 0;
168 off = &hash_memo.off;
170 /* event name first */
171 dump_name(event_name, buf, off);
173 /* number of attributes second */
174 rv = marshall_U_INT_16(size, buf, MAX_MSG_SIZE, off);
175 if (rv <= 0)
176 rb_raise(rb_eRuntimeError, "failed to dump num_attrs");
178 /* dump encoding before other fields */
179 enc = rb_hash_aref(event, sym_enc);
180 if (NIL_P(enc))
181 enc = rb_hash_aref(event, ENC);
182 if (! NIL_P(enc))
183 dump_enc(enc, buf, off);
185 /* the rest of the fields */
186 rb_iterate(rb_each, event, event_hash_iter_i, memo);
188 if (lwes_emitter_emit_bytes(rle->emitter, buf, *off) < 0)
189 rb_raise(rb_eRuntimeError, "failed to emit event");
191 return event;
194 static void
195 marshal_field(
196 char *name,
197 LWES_TYPE type,
198 VALUE val,
199 LWES_BYTE_P buf,
200 size_t *off)
202 volatile VALUE raise_inspect;
204 switch (type) {
205 case LWES_TYPE_STRING:
206 if (dump_string(name, val, buf, off) > 0)
207 return;
208 break;
209 case LWES_TYPE_BOOLEAN:
210 if (dump_bool(name, val, buf, off) > 0)
211 return;
212 break;
213 default:
214 dump_name(name, buf, off);
215 lwesrb_dump_num(type, val, buf, off);
216 return;
219 rb_raise(rb_eRuntimeError, "failed to set %s=%s",
220 name, RAISE_INSPECT(val));
223 static void lwes_struct_class(
224 VALUE *event_class,
225 VALUE *name,
226 VALUE *type_list,
227 VALUE *have_enc,
228 VALUE event)
230 VALUE type_db;
232 *event_class = CLASS_OF(event);
233 type_db = rb_const_get(*event_class, id_TYPE_DB);
235 if (CLASS_OF(type_db) != cLWES_TypeDB)
236 rb_raise(rb_eArgError, "class does not have valid TYPE_DB");
238 *name = rb_const_get(*event_class, id_NAME);
239 Check_Type(*name, T_STRING);
240 *type_list = rb_const_get(*event_class, id_TYPE_LIST);
241 Check_Type(*type_list, T_ARRAY);
243 *have_enc = rb_const_get(*event_class, id_HAVE_ENCODING);
246 #if !defined(RSTRUCT_PTR) && defined(RSTRUCT)
247 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
248 #endif
249 static VALUE * rstruct_ptr(VALUE *ary, VALUE rstruct)
251 #ifdef RSTRUCT_PTR
252 return RSTRUCT_PTR(*ary = rstruct);
253 #else
254 *ary = rb_funcall(rstruct, rb_intern("to_a"), 0, 0);
255 return RARRAY_PTR(*ary);
256 #endif
259 static VALUE emit_struct(VALUE self, VALUE event)
261 VALUE event_class, name, type_list, have_enc;
262 struct _rb_lwes_emitter *rle = _rle(self);
263 LWES_BYTE_P buf = rle->emitter->buffer;
264 size_t off = 0;
265 long i;
266 VALUE *tmp;
267 LWES_U_INT_16 num_attr = 0;
268 size_t num_attr_off;
269 VALUE *flds;
270 char *str;
272 lwes_struct_class(&event_class, &name, &type_list, &have_enc, event);
274 /* event name */
275 str = StringValueCStr(name);
276 dump_name(str, buf, &off);
278 /* number of attributes, use a placeholder until we've iterated */
279 num_attr_off = off;
280 if (marshall_U_INT_16(0, buf, MAX_MSG_SIZE, &off) < 0)
281 rb_raise(rb_eRuntimeError,
282 "failed to marshal number_of_attributes");
284 /* dump encoding before other fields */
285 if (have_enc == Qtrue) {
286 VALUE enc = rb_funcall(event, id_enc, 0, 0);
287 if (! NIL_P(enc)) {
288 ++num_attr;
289 dump_enc(enc, buf, &off);
293 i = RARRAY_LEN(type_list);
294 flds = rstruct_ptr(&name, event);
295 tmp = RARRAY_PTR(type_list);
296 for (; --i >= 0; tmp++, flds++) {
297 /* inner: [ :field_sym, "field_name", type ] */
298 VALUE *inner = RARRAY_PTR(*tmp);
299 VALUE val;
300 LWES_TYPE type;
302 if (inner[0] == sym_enc) /* encoding was already dumped */
303 continue;
305 val = *flds;
306 if (NIL_P(val))
307 continue; /* LWES doesn't know nil */
309 str = StringValueCStr(inner[1]);
310 type = NUM2INT(inner[2]);
311 ++num_attr;
312 marshal_field(str, type, val, buf, &off);
315 /* now we've iterated, we can accurately give num_attr */
316 if (marshall_U_INT_16(num_attr, buf, MAX_MSG_SIZE, &num_attr_off) <= 0)
317 rb_raise(rb_eRuntimeError, "failed to marshal num_attr");
319 if (lwes_emitter_emit_bytes(rle->emitter, buf, off) < 0)
320 rb_raise(rb_eRuntimeError, "failed to emit event");
322 return event;
325 static VALUE emit_event(VALUE self, VALUE event)
327 struct lwes_event *e = lwesrb_get_event(event);
329 if (lwes_emitter_emit(_rle(self)->emitter, e) < 0)
330 rb_raise(rb_eRuntimeError, "failed to emit event");
332 return event;
335 * call-seq:
336 * emitter << event
338 * Emits the given +event+ which much be an LWES::Event or
339 * LWES::Struct-derived object
341 static VALUE emitter_ltlt(VALUE self, VALUE event)
343 if (rb_obj_is_kind_of(event, cLWES_Event)) {
344 return emit_event(self, event);
345 } else {
346 Check_Type(event, T_STRUCT);
348 return emit_struct(self, event);
353 * call-seq:
354 * emitter.emit("EventName", :foo => "HI")
355 * emitter.emit("EventName", :foo => [ :int32, 123 ])
356 * emitter.emit(EventClass, :foo => "HI")
357 * emitter.emit(event)
359 * Emits a hash. If EventName is given as a string, it will expect a hash
360 * as its second argument and will do its best to serialize a Ruby Hash
361 * to an LWES Event. If a type is ambiguous, a two-element array may be
362 * specified as its value, including the LWES type information and the
363 * Ruby value.
365 * If an EventClass is given, the second argument should be a hash with
366 * the values given to the class. This will emit the event named by
367 * EventClass.
369 * If only one argument is given, it behaves just like LWES::Emitter#<<
371 static VALUE emitter_emit(int argc, VALUE *argv, VALUE self)
373 volatile VALUE raise_inspect;
374 char *err;
375 VALUE name = Qnil;
376 VALUE event = Qnil;
377 argc = rb_scan_args(argc, argv, "11", &name, &event);
379 switch (TYPE(name)) {
380 case T_STRING:
381 if (TYPE(event) == T_HASH)
382 return emit_hash(self, name, event);
383 rb_raise(rb_eTypeError,
384 "second argument must be a hash when first "
385 "is a String");
386 case T_STRUCT:
387 if (argc >= 2)
388 rb_raise(rb_eArgError,
389 "second argument not allowed when first"
390 " is a Struct");
391 event = name;
392 return emit_struct(self, event);
393 case T_CLASS:
394 if (TYPE(event) != T_HASH)
395 rb_raise(rb_eTypeError,
396 "second argument must be a Hash when first"
397 " is a Class");
400 * we can optimize this so there's no intermediate
401 * struct created
403 event = rb_funcall(name, id_new, 1, event);
404 if (TYPE(event) == T_STRUCT)
405 return emit_struct(self, event);
406 if (rb_obj_is_kind_of(event, cLWES_Event))
407 return emit_event(self, event);
408 name = rb_class_name(name);
409 err = StringValuePtr(name);
410 rb_raise(rb_eArgError,
411 "%s created a bad event: %s",
412 err, RAISE_INSPECT(event));
413 default:
414 if (rb_obj_is_kind_of(name, cLWES_Event))
415 return emit_event(self, name);
416 rb_raise(rb_eArgError,
417 "bad argument: %s, must be a String, Struct or Class",
418 RAISE_INSPECT(name));
421 assert(0 && "should never get here");
422 return event;
426 * call-seq:
427 * emitter.close -> nil
429 * Destroys the associated lwes_emitter and the associated socket. This
430 * method is rarely needed as Ruby garbage collection will take care of
431 * closing for you, but may be useful in odd cases when it is desirable
432 * to release file descriptors ASAP.
434 static VALUE emitter_close(VALUE self)
436 struct _rb_lwes_emitter *rle = _rle(self);
438 if (rle->emitter)
439 lwes_emitter_destroy(rle->emitter);
440 rle->emitter = NULL;
442 return Qnil;
445 static void lwesrb_emitter_create(struct _rb_lwes_emitter *rle)
447 int gc_retry = 1;
448 retry:
449 if (rle->ttl == UINT32_MAX)
450 rle->emitter = lwes_emitter_create(
451 rle->address, rle->iface, rle->port,
452 rle->emit_heartbeat, rle->freq);
453 else
454 rle->emitter = lwes_emitter_create_with_ttl(
455 rle->address, rle->iface, rle->port,
456 rle->emit_heartbeat, rle->freq, rle->ttl);
458 if (!rle->emitter) {
459 if (--gc_retry == 0) {
460 rb_gc();
461 goto retry;
463 rb_raise(rb_eRuntimeError, "failed to create LWES emitter");
467 /* :nodoc: */
468 static VALUE init_copy(VALUE dest, VALUE obj)
470 struct _rb_lwes_emitter *dst = _rle(dest);
471 struct _rb_lwes_emitter *src = _rle(obj);
473 memcpy(dst, src, sizeof(*dst));
474 dst->address = ruby_strdup(src->address);
475 if (dst->iface)
476 dst->iface = ruby_strdup(src->iface);
477 lwesrb_emitter_create(dst);
479 assert(dst->emitter && dst->emitter != src->emitter &&
480 "emitter not a copy");
482 return dest;
485 /* :nodoc: should only used internally by #initialize */
486 static VALUE _create(VALUE self, VALUE options)
488 struct _rb_lwes_emitter *rle = _rle(self);
489 VALUE address, iface, port, heartbeat, ttl;
491 rle->emit_heartbeat = FALSE;
492 rle->freq = 0;
493 rle->ttl = UINT32_MAX; /* nobody sets a ttl this long, right? */
495 if (rle->emitter)
496 rb_raise(rb_eRuntimeError, "already created lwes_emitter");
497 if (TYPE(options) != T_HASH)
498 rb_raise(rb_eTypeError, "options must be a hash");
500 address = rb_hash_aref(options, ID2SYM(rb_intern("address")));
501 if (TYPE(address) != T_STRING)
502 rb_raise(rb_eTypeError, ":address must be a string");
503 rle->address = ruby_strdup(StringValueCStr(address));
505 iface = rb_hash_aref(options, ID2SYM(rb_intern("iface")));
506 switch (TYPE(iface)) {
507 case T_NIL:
508 rle->iface = NULL;
509 break;
510 case T_STRING:
511 rle->iface = ruby_strdup(StringValueCStr(iface));
512 break;
513 default:
514 rb_raise(rb_eTypeError, ":iface must be a String or nil");
517 port = rb_hash_aref(options, ID2SYM(rb_intern("port")));
518 if (TYPE(port) != T_FIXNUM)
519 rb_raise(rb_eTypeError, ":port must be a Fixnum");
520 rle->port = NUM2UINT(port);
522 heartbeat = rb_hash_aref(options, ID2SYM(rb_intern("heartbeat")));
523 if (TYPE(heartbeat) == T_FIXNUM) {
524 int tmp = NUM2INT(heartbeat);
525 if (tmp > INT16_MAX)
526 rb_raise(rb_eArgError,":heartbeat > INT16_MAX seconds");
527 rle->emit_heartbeat = TRUE;
528 rle->freq = (LWES_INT_16)tmp;
529 } else if (NIL_P(heartbeat)) { /* do nothing, use defaults */
530 } else
531 rb_raise(rb_eTypeError, ":heartbeat must be a Fixnum or nil");
533 ttl = rb_hash_aref(options, ID2SYM(rb_intern("ttl")));
534 if (TYPE(ttl) == T_FIXNUM) {
535 unsigned LONG_LONG tmp = NUM2ULL(ttl);
536 if (tmp >= UINT32_MAX)
537 rb_raise(rb_eArgError, ":ttl >= UINT32_MAX seconds");
538 rle->ttl = (LWES_U_INT_32)tmp;
539 } else if (NIL_P(ttl)) { /* do nothing, no ttl */
540 } else
541 rb_raise(rb_eTypeError, ":ttl must be a Fixnum or nil");
543 lwesrb_emitter_create(rle);
545 return self;
548 /* Init_lwes_ext will call this */
549 void lwesrb_init_emitter(void)
551 VALUE mLWES = rb_define_module("LWES");
552 VALUE cLWES_Emitter =
553 rb_define_class_under(mLWES, "Emitter", rb_cObject);
555 rb_define_method(cLWES_Emitter, "<<", emitter_ltlt, 1);
556 rb_define_method(cLWES_Emitter, "emit", emitter_emit, -1);
557 rb_define_method(cLWES_Emitter, "_create", _create, 1);
558 rb_define_method(cLWES_Emitter, "close", emitter_close, 0);
559 rb_define_method(cLWES_Emitter, "initialize_copy", init_copy, 1);
560 rb_define_alloc_func(cLWES_Emitter, rle_alloc);
561 LWESRB_MKID(TYPE_DB);
562 LWESRB_MKID(TYPE_LIST);
563 LWESRB_MKID(NAME);
564 LWESRB_MKID(HAVE_ENCODING);
565 LWESRB_MKID(new);
566 LWESRB_MKID(size);
567 id_enc = rb_intern(LWES_ENCODING);
568 sym_enc = ID2SYM(id_enc);
570 ENC = rb_obj_freeze(rb_str_new2(LWES_ENCODING));
573 * the key in an LWES::Event to designate the encoding of
574 * an event, currently "enc"
576 rb_define_const(mLWES, "ENCODING", ENC);