From 6a6c625277221bd78ea5a0f0fb3a6c58a124bf4c Mon Sep 17 00:00:00 2001 From: Anthony Molinaro Date: Thu, 23 Dec 2010 05:51:07 +0000 Subject: [PATCH] initial version git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-erlang/trunk@631 a2f82657-cdd2-4550-bd36-68a8e7111808 --- AUTHORS | 1 + COPYING | 1 + ChangeLog | 2 + LICENSE | 34 +++++++ Makefile.am.local | 2 + NEWS | 0 README | 0 bootstrap | 17 ++++ configure.ac.local | 2 + doc/Makefile.am.local | 1 + doc/overview.edoc | 4 + fw-pkgin/Makefile.am.local | 1 + fw-pkgin/config | 94 +++++++++++++++++++ fw-pkgin/post-install | 21 +++++ fw-pkgin/post-remove | 28 ++++++ fw-pkgin/pre-install | 23 +++++ fw-pkgin/pre-remove | 28 ++++++ fw-pkgin/start | 10 ++ fw-pkgin/stop | 10 ++ src/Makefile.am.local | 16 ++++ src/lwes.erl | 104 +++++++++++++++++++++ src/lwes.hrl | 26 ++++++ src/lwes_app.erl | 40 ++++++++ src/lwes_channel.erl | 193 ++++++++++++++++++++++++++++++++++++++ src/lwes_channel_manager.erl | 102 +++++++++++++++++++++ src/lwes_channel_sup.erl | 52 +++++++++++ src/lwes_event.erl | 214 +++++++++++++++++++++++++++++++++++++++++++ src/lwes_internal.hrl | 27 ++++++ src/lwes_sup.erl | 57 ++++++++++++ src/lwes_util.erl | 38 ++++++++ tests/Makefile.am.local | 9 ++ 31 files changed, 1157 insertions(+) create mode 100644 AUTHORS create mode 100644 COPYING create mode 100644 ChangeLog create mode 100644 LICENSE create mode 100644 Makefile.am.local create mode 100644 NEWS create mode 100644 README create mode 100755 bootstrap create mode 100644 configure.ac.local create mode 100644 doc/Makefile.am.local create mode 100644 doc/overview.edoc create mode 100644 fw-pkgin/Makefile.am.local create mode 100644 fw-pkgin/config create mode 100755 fw-pkgin/post-install create mode 100755 fw-pkgin/post-remove create mode 100755 fw-pkgin/pre-install create mode 100755 fw-pkgin/pre-remove create mode 100755 fw-pkgin/start create mode 100755 fw-pkgin/stop create mode 100644 src/Makefile.am.local create mode 100644 src/lwes.erl create mode 100644 src/lwes.hrl create mode 100644 src/lwes_app.erl create mode 100644 src/lwes_channel.erl create mode 100644 src/lwes_channel_manager.erl create mode 100644 src/lwes_channel_sup.erl create mode 100644 src/lwes_event.erl create mode 100644 src/lwes_internal.hrl create mode 100644 src/lwes_sup.erl create mode 100644 src/lwes_util.erl create mode 100644 tests/Makefile.am.local diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..41bbf40 --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +Anthony Molinaro diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..7ff097c --- /dev/null +++ b/COPYING @@ -0,0 +1 @@ +See LICENSE diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..3b0d917 --- /dev/null +++ b/ChangeLog @@ -0,0 +1,2 @@ +Version 0.0.0 (molinaro) + * Initial version diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d5afb92 --- /dev/null +++ b/LICENSE @@ -0,0 +1,34 @@ +Software Copyright License Agreement (BSD License) + +Copyright (c) 2010, OpenX Inc. +All rights reserved. + +Redistribution and use of this software in source and binary forms, +with or without modification, are permitted provided that the following +conditions are met: + +* Redistributions of source code must retain the above + copyright notice, this list of conditions and the + following disclaimer. + +* Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the + following disclaimer in the documentation and/or other + materials provided with the distribution. + +* Neither the name of Yahoo! Inc. nor the names of its + contributors may be used to endorse or promote products + derived from this software without specific prior + written permission of Yahoo! Inc. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile.am.local b/Makefile.am.local new file mode 100644 index 0000000..a2c69b5 --- /dev/null +++ b/Makefile.am.local @@ -0,0 +1,2 @@ +# put whatever (auto)make commands here, they will be included from Makefile.am +EXTRA_DIST += LICENSE diff --git a/NEWS b/NEWS new file mode 100644 index 0000000..e69de29 diff --git a/README b/README new file mode 100644 index 0000000..e69de29 diff --git a/bootstrap b/bootstrap new file mode 100755 index 0000000..037a54b --- /dev/null +++ b/bootstrap @@ -0,0 +1,17 @@ +#! /bin/sh + +if test -d fw/bin + then + PATH="`pwd`/fw/bin:$PATH" + export PATH + fi + +fwb=`which fw-bootstrap` + +if test -z "$fwb" + then + echo "bootstrap: fatal: fw-bootstrap not installed or not in PATH" 1>&2 + exit 1 + fi + +"$fwb" --fw_version "0.2.11" --name lwes-erlang --revision svn --template erlang --svn_project_path https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-erlang/trunk --svn_tag_root https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-erlang/tags "$@" diff --git a/configure.ac.local b/configure.ac.local new file mode 100644 index 0000000..f5fb31a --- /dev/null +++ b/configure.ac.local @@ -0,0 +1,2 @@ +dnl -- include additional autoconf commands here +dnl -- do not include AC_OUTPUT, this is called for you diff --git a/doc/Makefile.am.local b/doc/Makefile.am.local new file mode 100644 index 0000000..c6d5812 --- /dev/null +++ b/doc/Makefile.am.local @@ -0,0 +1 @@ +# put whatever (auto)make commands here, they will be included from Makefile.am diff --git a/doc/overview.edoc b/doc/overview.edoc new file mode 100644 index 0000000..2358008 --- /dev/null +++ b/doc/overview.edoc @@ -0,0 +1,4 @@ +@author Your Name Here +@copyright 2008 Whatever +@title Your great Erlang project! +@doc This project is so great, perhaps you should write some documentation for it. diff --git a/fw-pkgin/Makefile.am.local b/fw-pkgin/Makefile.am.local new file mode 100644 index 0000000..c6d5812 --- /dev/null +++ b/fw-pkgin/Makefile.am.local @@ -0,0 +1 @@ +# put whatever (auto)make commands here, they will be included from Makefile.am diff --git a/fw-pkgin/config b/fw-pkgin/config new file mode 100644 index 0000000..20a4d7c --- /dev/null +++ b/fw-pkgin/config @@ -0,0 +1,94 @@ +# The FW_PACKAGE_MAINTAINER field is populated with the environment +# variable FW_PACKAGE_DEFAULT_MAINTAINER if non-empty at init time + +FW_PACKAGE_NAME="lwes-erlang" +FW_PACKAGE_VERSION="0.0.0" +FW_PACKAGE_MAINTAINER="Anthony Molinaro " +FW_PACKAGE_SHORT_DESCRIPTION="Erlang Bindings for LWES" +FW_PACKAGE_DESCRIPTION=`cat README` +FW_PACKAGE_ARCHITECTURE_DEPENDENT="1" + +# Dependency information. The native syntax corresponds to Debian, +# http://www.debian.org/doc/debian-policy/ch-relationships.html +# Section 7.1 "Syntax of Relationship Fields" +# +# For other packaging systems, the syntax is translated for you. + +FW_PACKAGE_DEPENDS="" +FW_PACKAGE_CONFLICTS="" +FW_PACKAGE_PROVIDES="" +FW_PACKAGE_REPLACES="" +FW_PACKAGE_SUGGESTS="" + +FW_PACKAGE_BUILD_DEPENDS="fw-template-erlang (>= 0.1.52)" +FW_PACKAGE_BUILD_CONFLICTS="" + +# dupload is used for submitting debian packages to a package archive +# The FW_DUPLOAD_ARGS field is populated with the environment variable +# FW_DEFAULT_DUPLOAD_ARGS if non-empty at init time + +FW_DUPLOAD_ARGS=${FW_DUPLOAD_ARGS-"-q"} + +# scp+createrepo is used for submitting rpm packages to a package archive +# The FW_RPM_REPO_USER, FW_RPM_REPO_HOST, FW_RPM_REPO_BASEDIR, +# and FW_RPM_POSTCREATEREPO_COMMANDS variables are populated with +# FW_RPM_REPO_USER_DEFAULT, FW_RPM_REPO_HOST_DEFAULT, +# FW_RPM_REPO_BASEDIR_DEFAULT, and FW_RPM_POSTCREATEREPO_COMMANDS_DEFAULT +# respectively if non-empty at init time + +FW_RPM_REPO_USER=${FW_RPM_REPO_USER-"`whoami`"} +FW_RPM_REPO_HOST=${FW_RPM_REPO_HOST-"localhost"} +FW_RPM_REPO_BASEDIR=${FW_RPM_REPO_BASEDIR-""} +FW_RPM_CREATEREPO_ARGS=${FW_RPM_CREATEREPO_ARGS-"-q --database"} + +# this variable controls whether createrepo is run incrementally (--update). +# possible settings are yes (always do it), no (never do it), and +# auto (do it if the repository has been previously initialized) +FW_RPM_CREATEREPO_INCREMENTAL=${FW_RPM_CREATEREPO_INCREMENTAL-"auto"} + +# these commands will be run after a successful createrepo run +FW_RPM_POSTCREATEREPO_COMMANDS=${FW_RPM_POSTCREATEREPO_COMMANDS-"true"} +# here's a suggestion: +# FW_RPM_POSTCREATEREPO_COMMANDS="gpg --detach-sign --armor repodata/repomd.xml" + +# set to the directory in which version-named tags will be created +FW_SUBVERSION_TAG_ROOT="https://lwes.svn.sourceforge.net/svnroot/lwes/lwes-erlang/tags" + +# uncomment and set manually if you want the application name to be different +# from FW_PACKAGE_NAME, application name must consist of characters [a-zA-Z_] +FW_ERL_APP_NAME="lwes" + +# uncomment and set manually if autodetection of modules is incorrect. +# should be an erlang expression which evaluates to a list. +# FW_ERL_APP_MODULES="[]" + +# uncomment and set manually if autodetection of registered processes is incorrect +# should be an erlang expression which evaluates to a list. +# FW_ERL_APP_REGISTERED="[]" + +# uncomment and set manually if autodetection of start module is incorrect +# should be an erlang expression which evaluates to an atom. +# FW_ERL_APP_START_MODULE="" + +# uncomment to define start args to the start module. should be an erlang +# expression which evaluates to a list. this only has an effect if +# FW_ERL_APP_START_MODULE is set manually. +# FW_ERL_APP_START_ARGS="[]" + +# uncomment if the module line being generated is incorrect and you want +# to override it. +# FW_ERL_APP_MOD_LINE="{ mod, { $FW_ERL_APP_START_MODULE, $FW_ERL_APP_START_ARGS } }" + +# uncomment to define the application environment variables. should be an +# erlang expression which evaluates to a list. +# FW_ERL_APP_ENVIRONMENT="[]" + +# uncomment to indicate additional OTP applications (besides kernel and stdlib) +# that this application depends upon. should be an erlang expression which +# evaluates to a list. +# FW_ERL_PREREQ_APPLICATIONS_EXTRA="[]" + +# uncomment to add arbitrary extra content to the app file, e.g., an +# included application directive. should be an erlang expression which +# evaluates to a proplist (list of key-value tuple pairs). +# FW_ERL_APP_EXTRA="[]" diff --git a/fw-pkgin/post-install b/fw-pkgin/post-install new file mode 100755 index 0000000..39d1594 --- /dev/null +++ b/fw-pkgin/post-install @@ -0,0 +1,21 @@ +#! /bin/sh + +set -e + +#--------------------------------------------------------------------- +# post-install +# +# Executed after the package is installed. +# +# http://code.google.com/p/fwtemplates/wiki/PackageHooks +#--------------------------------------------------------------------- + +case "$1" in + configure) + # most recently configured version is $2 (possibly empty string) + ;; + *) + ;; +esac + +exit 0 diff --git a/fw-pkgin/post-remove b/fw-pkgin/post-remove new file mode 100755 index 0000000..c9290f2 --- /dev/null +++ b/fw-pkgin/post-remove @@ -0,0 +1,28 @@ +#! /bin/sh + +set -e + +#--------------------------------------------------------------------- +# post-remove +# +# Executed after the package is removed. +# +# http://code.google.com/p/fwtemplates/wiki/PackageHooks +#--------------------------------------------------------------------- + +case "$1" in + upgrade) + # defer to newer package's script + exit 1 + ;; + failed-upgrade) + # actually handle the upgrade here + # old-version is $2 + ;; + remove) + ;; + *) + ;; +esac + +exit 0 diff --git a/fw-pkgin/pre-install b/fw-pkgin/pre-install new file mode 100755 index 0000000..f6001be --- /dev/null +++ b/fw-pkgin/pre-install @@ -0,0 +1,23 @@ +#! /bin/sh + +set -e + +#--------------------------------------------------------------------- +# pre-install +# +# Executed before the package is installed. +# +# http://code.google.com/p/fwtemplates/wiki/PackageHooks +#--------------------------------------------------------------------- + +case "$1" in + install) + ;; + upgrade) + # old version is $2 + ;; + *) + ;; +esac + +exit 0 diff --git a/fw-pkgin/pre-remove b/fw-pkgin/pre-remove new file mode 100755 index 0000000..56e0e52 --- /dev/null +++ b/fw-pkgin/pre-remove @@ -0,0 +1,28 @@ +#! /bin/sh + +set -e + +#--------------------------------------------------------------------- +# pre-remove +# +# Executed before the package is removed. +# +# http://code.google.com/p/fwtemplates/wiki/PackageHooks +#--------------------------------------------------------------------- + +case "$1" in + upgrade) + # defer to newer package's script + exit 1 + ;; + failed-upgrade) + # actually handle the upgrade here + # old-version is $2 + ;; + remove) + ;; + *) + ;; +esac + +exit 0 diff --git a/fw-pkgin/start b/fw-pkgin/start new file mode 100755 index 0000000..dc16300 --- /dev/null +++ b/fw-pkgin/start @@ -0,0 +1,10 @@ +#! /bin/sh + +#--------------------------------------------------------------------- +# start +# +# Executed when the package (service) is started up. +# Not supported by all package formats. +#--------------------------------------------------------------------- + +exit 0 diff --git a/fw-pkgin/stop b/fw-pkgin/stop new file mode 100755 index 0000000..665484d --- /dev/null +++ b/fw-pkgin/stop @@ -0,0 +1,10 @@ +#! /bin/sh + +#--------------------------------------------------------------------- +# start +# +# Executed when the package (service) is shut down. +# Not supported by all package formats. +#--------------------------------------------------------------------- + +exit 0 diff --git a/src/Makefile.am.local b/src/Makefile.am.local new file mode 100644 index 0000000..09203d0 --- /dev/null +++ b/src/Makefile.am.local @@ -0,0 +1,16 @@ +# put whatever (auto)make commands here, they will be included from Makefile.am + +dist_erlappsrc_DATA = \ + lwes_internal.hrl \ + $(wildcard *.erl) + +dist_erlappinclude_DATA = \ + lwes.hrl + +erlappebin_SCRIPTS = \ + @FW_ERL_APP_NAME@.app \ + $(patsubst %.erl, %.beam, $(dist_erlappsrc_DATA)) + +check_DATA = \ + .dialyzer_ok \ + .@FW_ERL_APP_NAME@.app_ok diff --git a/src/lwes.erl b/src/lwes.erl new file mode 100644 index 0000000..7384711 --- /dev/null +++ b/src/lwes.erl @@ -0,0 +1,104 @@ +%%% +%%% Light Weight Event System (LWES) +%%% +%%% Creating Events +%%% Event0 = lwes_event:new ("MyEvent"), +%%% Event1 = lwes_event:set_uint16 (Event0, "MyUint16", 25), +%%% +%%% Emitting +%%% +%%% Channel = lwes:open (emitter, Ip, Port) +%%% ok = lwes:emit (Channel, Event1). +%%% +%%% Listening via callback +%%% +%%% Channel = lwes:open (listener, Ip, Port) +%%% lwes:listen (Channel, Type, Fun, Accum). +%%% +%%% Fun is called for each event +%%% +%%% Closing channel +%%% +%%% lwes:close (Channel) + +-module (lwes). + +-include_lib ("lwes.hrl"). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export ([ open/3, + emit/2, + listen/4, + close/1 ]). + +%%==================================================================== +%% API functions +%%==================================================================== +open (Type, Ip, Port) -> + try check_args (Type, Ip, Port) of + { T, I, P } -> lwes_channel:open (T, I, P) + catch + _:_ -> {error, cant_open} + end. + +emit (Channel, Event) -> + lwes_channel:send_to (Channel, lwes_event:to_binary (Event)). + +% +% Callback function - function is called with an event in given format +% and the current state, it should return the next +% state +% +% Type is one of +% +% raw - callback is given raw udp structure, use lwes_event:from_udp to +% turn into event +% list - callback is given an #lwes_event record where the name is a +% binary, and the attributes is a proplist where keys are binaries, +% and values are either integers (for lwes int types), binaries +% (for lwes strings), true|false atoms (for lwes booleans), +% or 4-tuples (for lwes ip addresses) +% tagged - callback is given an #lwes_event record where the name is a +% binary, and the attributes are 3-tuples with the first element +% the type of data, the second the key as a binary and the +% third the values as in the list format +% +% Initial State is whatever you want +listen (Channel, CallbackFunction, EventType, CallbackInitialState) + when is_function (CallbackFunction, 2), + EventType =:= raw ; EventType =:= tagged ; EventType =:= list -> + lwes_channel:register_callback (Channel, CallbackFunction, + EventType, CallbackInitialState). + +close (Channel) -> + lwes_channel:close (Channel). + +%%==================================================================== +%% Internal functions +%%==================================================================== + +check_args (Type, Ip, Port) -> + { normalize_type (Type), lwes_util:normalize_ip (Ip), normalize_port (Port) }. + +normalize_port (Port) when is_integer (Port), Port >= 0, Port =< 65535 -> + Port; +normalize_port (Port) when is_list (Port) -> + list_to_integer (Port); +normalize_port (_) -> + erlang:error (badarg). + +normalize_type (Type) when Type =:= emitter; Type =:= listener -> + Type; +normalize_type (_) -> + erlang:error (badarg). + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes.hrl b/src/lwes.hrl new file mode 100644 index 0000000..2292db2 --- /dev/null +++ b/src/lwes.hrl @@ -0,0 +1,26 @@ +-ifndef(_lwes_included). +-define(_lwes_included, yup). + +-record (lwes_event, {name, attrs = []}). + +-define (LWES_TYPE_U_INT_16, 1). +-define (LWES_TYPE_INT_16, 2). +-define (LWES_TYPE_U_INT_32, 3). +-define (LWES_TYPE_INT_32, 4). +-define (LWES_TYPE_STRING, 5). +-define (LWES_TYPE_IP_ADDR, 6). +-define (LWES_TYPE_INT_64, 7). +-define (LWES_TYPE_U_INT_64, 8). +-define (LWES_TYPE_BOOLEAN, 9). + +-define (LWES_U_INT_16, uint16). +-define (LWES_INT_16, int16). +-define (LWES_U_INT_32, uint32). +-define (LWES_INT_32, int32). +-define (LWES_STRING, string). +-define (LWES_IP_ADDR, ip_addr). +-define (LWES_INT_64, int64). +-define (LWES_U_INT_64, uint64). +-define (LWES_BOOLEAN, boolean). + +-endif. diff --git a/src/lwes_app.erl b/src/lwes_app.erl new file mode 100644 index 0000000..0bbb851 --- /dev/null +++ b/src/lwes_app.erl @@ -0,0 +1,40 @@ +-module (lwes_app). + +-behaviour (application). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([start/0]). + +%% application callbacks +-export ([start/2, stop/1]). + +%%==================================================================== +%% API functions +%%==================================================================== +start () -> + [application:start(App) || App <- [sasl, lwes_erlang]]. + +%%==================================================================== +%% application callbacks +%%==================================================================== +start (_Type, _Args) -> + case lwes_sup:start_link() of + {ok, Pid} -> + {ok, Pid}; + Error -> + Error + end. + +stop (_State) -> + ok. + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_channel.erl b/src/lwes_channel.erl new file mode 100644 index 0000000..54e1987 --- /dev/null +++ b/src/lwes_channel.erl @@ -0,0 +1,193 @@ +-module (lwes_channel). + +-behaviour (gen_server). + +-include_lib ("lwes.hrl"). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export ([ start_link/1, + open/3, + register_callback/4, + send_to/2, + close/1 + ]). + +%% gen_server callbacks +-export ([ init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). + +-record (state, {socket, channel, type, callback}). +-record (callback, {function, format, state}). +-record (channel, {ip, port, is_multicast, type, ref}). + +%%==================================================================== +%% API functions +%%==================================================================== +start_link (Channel) -> + gen_server:start_link (?MODULE, [Channel], []). + +open (Type, Ip, Port ) -> + Channel = #channel { + ip = Ip, + port = Port, + is_multicast = is_multicast (Ip), + type = Type, + ref = make_ref () + }, + { ok, _Pid } = lwes_channel_manager:open_channel (Channel), + Channel. + +register_callback (Channel, CallbackFunction, EventType, CallbackState) -> + find_and_call ( Channel, + { register, CallbackFunction, EventType, CallbackState }). + +send_to (Channel, Msg) -> + find_and_call (Channel, { send, Msg }). + +close (Channel) -> + find_and_cast (Channel, stop). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== +init ([ Channel = #channel { + ip = Ip, + port = Port, + is_multicast = IsMulticast, + type = Type + } + ]) -> + { ok, Socket }= + case {Type, IsMulticast} of + {listener, true} -> + gen_udp:open ( Port, + [ { reuseaddr, true }, + { ip, Ip }, + { multicast_ttl, 4 }, + { multicast_loop, false }, + { add_membership, {Ip, {0,0,0,0}}}, + binary + ]); + {listener, false} -> + gen_udp:open ( Port, [ binary ]); + {_, _} -> + gen_udp:open ( 0, [ binary ]) + end, + lwes_channel_manager:register_channel (Channel, self()), + { ok, #state { socket = Socket, + channel = Channel, + type = Type + } + }. + +handle_call ({ register, Function, Format, Accum }, + _From, + State = #state { + channel = #channel {type = listener } + }) -> + { reply, + ok, + State#state { callback = #callback { function = Function, + format = Format, + state = Accum } } }; + +handle_call ({ send, Packet }, + _From, + State = #state { + socket = Socket, + channel = #channel { ip = Ip, port = Port } + }) -> + { reply, + gen_udp:send (Socket, Ip, Port, Packet ), + State }; + +handle_call (Request, From, State) -> + error_logger:warning_msg + ("lwes_channel unrecognized call ~p from ~p~n",[Request, From]), + { reply, ok, State }. + +handle_cast (stop, State) -> + {stop, normal, State}; +handle_cast (Request, State) -> + error_logger:warning_msg + ("lwes_channel unrecognized cast ~p~n",[Request]), + { noreply, State }. + +% skip if we don't have a handler +handle_info ( {udp, _, _, _, _}, + State = #state { + type = listener, + callback = undefined + } ) -> + { noreply, State }; + +handle_info ( Packet = {udp, _, _, _, _}, + State = #state { + type = listener, + callback = #callback { function = Function, + format = Format, + state = CbState } + } ) -> + Event = + case Format of + raw -> Packet; + _ -> lwes_event:from_udp_packet (Packet, Format) + end, + NewCbState = Function (Event, CbState), + { noreply, + State#state { callback = #callback { function = Function, + format = Format, + state = NewCbState } } + }; + +handle_info ( Request, State) -> + error_logger:warning_msg + ("lwes_channel unrecognized info ~p~n",[Request]), + {noreply, State}. + +terminate (_Reason, #state {socket = Socket, channel = Channel}) -> + gen_udp:close (Socket), + lwes_channel_manager:unregister_channel (Channel). + +code_change (_OldVsn, State, _Extra) -> + {ok, State}. + +%%==================================================================== +%% Internal functions +%%==================================================================== +is_multicast ({N1, _, _, _}) when N1 >= 224, N1 =< 239 -> + true; +is_multicast (_) -> + false. + +find_and_call (Channel, Msg) -> + case lwes_channel_manager:find_channel (Channel) of + {error, not_open} -> + {error, not_open}; + Pid -> + gen_server:call ( Pid, Msg ) + end. + +find_and_cast (Channel, Msg) -> + case lwes_channel_manager:find_channel (Channel) of + {error, not_open} -> + {error, not_open}; + Pid -> + gen_server:cast ( Pid, Msg ) + end. + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_channel_manager.erl b/src/lwes_channel_manager.erl new file mode 100644 index 0000000..c8ae17e --- /dev/null +++ b/src/lwes_channel_manager.erl @@ -0,0 +1,102 @@ +-module (lwes_channel_manager). + +-behaviour (gen_server). + +-include_lib ("lwes.hrl"). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export ([ start_link/0, + open_channel/1, + register_channel/2, + unregister_channel/1, + find_channel/1, + close_channel/1 + ]). + +%% gen_server callbacks +-export ([ init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). + +-define (TABLE, lwes_channels). +-record (state, {}). + +%%==================================================================== +%% API +%%==================================================================== +start_link () -> + gen_server:start_link ( { local, ?MODULE }, ?MODULE, [], []). + +open_channel (Channel) -> + lwes_channel_sup:open_channel (Channel). + +register_channel (Channel, Pid) -> + gen_server:call (?MODULE, {reg, Channel, Pid}). + +unregister_channel (Channel) -> + gen_server:call (?MODULE, {unreg, Channel}). + +find_channel (Channel) -> + case ets:lookup (?TABLE, Channel) of + [] -> {error, not_open} ; + [{_Channel, Pid}] -> Pid + end. + +close_channel (Channel) -> + gen_server:call (find_channel (Channel), stop). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== +init ([]) -> + ets:new (?TABLE, [ named_table + % FIXME: in versions up to R14B, dialyzer doesn't like the next + % line, so commented out for the moment +% { read_concurrency, true }, + ]), + { ok, #state {} }. + +handle_call ({reg, Key, Val}, _From, State) -> + { reply, ets:insert (?TABLE, {Key, Val}), State }; +handle_call ({unreg, Key}, _From, State) -> + {reply, ets:delete (?TABLE, Key), State }; +handle_call (Request, From, State) -> + error_logger:warning_msg + ("lwes_channel_manager unrecognized call ~p from ~p~n",[Request, From]), + { reply, ok, State }. + +handle_cast (Request, State) -> + error_logger:warning_msg + ("lwes_channel_manager unrecognized cast ~p~n",[Request]), + { noreply, State }. + +handle_info (Request, State) -> + error_logger:warning_msg + ("lwes_channel_manager unrecognized info ~p~n",[Request]), + {noreply, State}. + +terminate (_Reason, _State) -> + ets:delete (?TABLE), + ok. + +code_change (_OldVsn, State, _Extra) -> + {ok, State}. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_channel_sup.erl b/src/lwes_channel_sup.erl new file mode 100644 index 0000000..461e827 --- /dev/null +++ b/src/lwes_channel_sup.erl @@ -0,0 +1,52 @@ +-module (lwes_channel_sup). + +-behaviour (supervisor). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export ([ start_link/0, + open_channel/1 ]). + +%% supervisor callbacks +-export ([ init/1 ]). + +%%==================================================================== +%% API functions +%%==================================================================== +%% @spec start_link() -> ServerRet +%% @doc API for starting the supervisor. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +open_channel (Channel) -> + supervisor:start_child (?MODULE, [Channel]). + +%%==================================================================== +%% supervisor callbacks +%%==================================================================== +%% @spec init([]) -> SupervisorTree +%% @doc supervisor callback. +init([]) -> + { ok, + { + {simple_one_for_one, 10, 10}, + [ { lwes_channel, + {lwes_channel, start_link, []}, + transient, + 2000, + worker, + [lwes_channel] + } + ] + } + }. + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_event.erl b/src/lwes_event.erl new file mode 100644 index 0000000..b317eb0 --- /dev/null +++ b/src/lwes_event.erl @@ -0,0 +1,214 @@ +-module (lwes_event). + +-include_lib ("lwes.hrl"). +-include_lib ("lwes_internal.hrl"). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([new/1, + set_uint16/3, + set_int16/3, + set_uint32/3, + set_int32/3, + set_uint64/3, + set_int64/3, + set_string/3, + set_ip_addr/3, + set_boolean/3, + to_binary/1, + from_udp_packet/2, + from_binary/2]). + +%%==================================================================== +%% API +%%==================================================================== +new (Name) -> + #lwes_event { name = Name }. + +set_int16 (E = #lwes_event { attrs = A }, K, V) when ?is_int16 (V) -> + E#lwes_event { attrs = [ { ?LWES_INT_16, K, V } | A ] }. +set_uint16 (E = #lwes_event { attrs = A }, K, V) when ?is_uint16 (V) -> + E#lwes_event { attrs = [ { ?LWES_U_INT_16, K, V } | A ] }. +set_int32 (E = #lwes_event { attrs = A}, K, V) when ?is_int32 (V) -> + E#lwes_event { attrs = [ { ?LWES_INT_32, K, V } | A ] }. +set_uint32 (E = #lwes_event { attrs = A}, K, V) when ?is_uint32 (V) -> + E#lwes_event { attrs = [ { ?LWES_U_INT_32, K, V } | A ] }. +set_int64 (E = #lwes_event { attrs = A}, K, V) when ?is_int64 (V) -> + E#lwes_event { attrs = [ { ?LWES_INT_64, K, V } | A ] }. +set_uint64 (E = #lwes_event { attrs = A}, K, V) when ?is_uint64 (V) -> + E#lwes_event { attrs = [ { ?LWES_U_INT_64, K, V } | A ] }. +set_boolean (E = #lwes_event { attrs = A}, K, V) when is_boolean (V) -> + E#lwes_event { attrs = [ { ?LWES_BOOLEAN, K, V } | A ] }. +set_string (E = #lwes_event { attrs = A}, K, V) when ?is_string (V) -> + E#lwes_event { attrs = [ { ?LWES_STRING, K, V } | A ] }. +set_ip_addr (E = #lwes_event { attrs = A}, K, V) -> + Ip = lwes_util:normalize_ip (V), + E#lwes_event { attrs = [ { ?LWES_IP_ADDR, K, Ip } | A ] }. + +to_binary (#lwes_event { name = EventName, attrs = AttrList }) -> + NumAttrs = length (AttrList), + iolist_to_binary ( + [ write_key (EventName), + <>, + write_attrs (AttrList, []) + ] + ). + +from_udp_packet ({ udp, _Socket, SenderIP, SenderPort, Packet }, Format) -> + Extra = + case Format of + tagged -> + [ { ?LWES_IP_ADDR, <<"SenderIP">>, SenderIP }, + { ?LWES_U_INT_16, <<"SenderPort">>, SenderPort }, + { ?LWES_INT_64, <<"ReceiptTime">>, millisecond_since_epoch () } ]; + _ -> + [ { <<"SenderIP">>, SenderIP }, + { <<"SenderPort">>, SenderPort }, + { <<"ReceiptTime">>, millisecond_since_epoch () } ] + end, + from_binary (Packet, Format, Extra). + +from_binary (<<>>,_) -> + undefined; +from_binary (Binary,Format) -> + from_binary (Binary, Format, []). + +%%==================================================================== +%% Internal functions +%%==================================================================== + +from_binary (Binary, Format, Accum0) -> + <> = Binary, + AttrList = read_attrs (Attrs, Format, Accum0), + #lwes_event { name = EventName, attrs = AttrList }. + +type_to_atom (?LWES_TYPE_U_INT_16) -> ?LWES_U_INT_16; +type_to_atom (?LWES_TYPE_INT_16) -> ?LWES_INT_16; +type_to_atom (?LWES_TYPE_U_INT_32) -> ?LWES_U_INT_32; +type_to_atom (?LWES_TYPE_INT_32) -> ?LWES_INT_32; +type_to_atom (?LWES_TYPE_U_INT_64) -> ?LWES_U_INT_64; +type_to_atom (?LWES_TYPE_INT_64) -> ?LWES_INT_64; +type_to_atom (?LWES_TYPE_STRING) -> ?LWES_STRING; +type_to_atom (?LWES_TYPE_BOOLEAN) -> ?LWES_BOOLEAN; +type_to_atom (?LWES_TYPE_IP_ADDR) -> ?LWES_IP_ADDR. + +millisecond_since_epoch () -> + {Meg, Sec, Mic} = os:timestamp(), + trunc (Meg * 1000000000 + Sec * 1000 + Mic / 1000). + +write_attrs ([], Accum) -> + Accum; +write_attrs ([{T,K,V} | Rest], Accum) -> + write_attrs (Rest, [ write_key (K), write (T, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_int16 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_INT_16, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_uint16 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_U_INT_16, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_int32 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_INT_32, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_uint32 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_U_INT_32, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_int64 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_INT_64, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_uint64 (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_U_INT_64, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when is_boolean (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_BOOLEAN, V) | Accum ]); +write_attrs ([{K,V} | Rest], Accum) when ?is_string (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_STRING, V) | Accum ]); +write_attrs ([{K,V = {_,_,_,_}} | Rest], Accum) when ?is_ip_addr (V) -> + write_attrs (Rest, [ write_key (K), write (?LWES_IP_ADDR, V) | Accum ]). + +write_key (Key) when is_atom (Key) -> + write_key (atom_to_list (Key)); +write_key (Key) -> + Len = iolist_size (Key), + [ <>, Key]. + +write (uint16, V) -> + <>; +write (int16, V) -> + <>; +write (uint32, V) -> + <>; +write (int32, V) -> + <>; +write (uint64, V) -> + <>; +write (int64, V) -> + <>; +write (ip_addr, {V1, V2, V3, V4}) -> + <>; +write (boolean, true) -> + <>; +write (boolean, false) -> + <>; +write (string, V) when is_atom (V) -> + write (string, atom_to_list (V)); +write (string, V) when is_list (V); is_binary (V) -> + SL = iolist_size (V), + [ <>, V ]. + +read_attrs (<<>>, _Format, Accum) -> + Accum; +read_attrs (Bin, Format, Accum) -> + <> = Bin, + { V, Rest } = read_value (T, Vals), + read_attrs (Rest, Format, + [ case Format of + tagged -> {type_to_atom (T), K, V}; + _ -> {K, V} + end + | Accum ]). + +read_value (?LWES_TYPE_U_INT_16, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_INT_16, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_U_INT_32, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_INT_32, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_U_INT_64, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_INT_64, Bin) -> + <> = Bin, + { V, Rest }; +read_value (?LWES_TYPE_IP_ADDR, Bin) -> + <> = Bin, + { {V4, V3, V2, V1}, Rest }; +read_value (?LWES_TYPE_BOOLEAN, Bin) -> + <> = Bin, + { case V of 0 -> false; _ -> true end, Rest }; +read_value (?LWES_TYPE_STRING, Bin) -> + <> = Bin, + { V, Rest }; +read_value (_, _) -> + throw (unknown_type). + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_internal.hrl b/src/lwes_internal.hrl new file mode 100644 index 0000000..0e590ff --- /dev/null +++ b/src/lwes_internal.hrl @@ -0,0 +1,27 @@ +-ifndef(_lwes_internal_included). +-define(_lwes_internal_included, yup). + +-define (is_int16 (V), V >= -32768, V =< 32767). +-define (is_uint16 (V), V >= 0, V =< 65535). +-define (is_int32 (V), V >= -2147483648, V =< 2147483647). +-define (is_uint32 (V), V >= 0, V =< 4294967295). +-define (is_int64 (V), V >= -9223372036854775808, V =< 9223372036854775807). +-define (is_uint64 (V), V >= 0, V =< 18446744073709551615). +-define (is_string (V), is_list (V); is_binary (V); is_atom (V)). +-define (is_ip_addr (V), + (is_tuple (V) andalso + tuple_size (V) =:= 4 andalso + is_integer (element (1,V)) andalso + element (1,V) >= 0 andalso + element (1,V) =< 255 andalso + is_integer (element (2,V)) andalso + element (2,V) >= 0 andalso + element (2,V) =< 255 andalso + is_integer (element (3,V)) andalso + element (3,V) >= 0 andalso + element (3,V) =< 255 andalso + is_integer (element (4,V)) andalso + element (4,V) >= 0 andalso + element (4,V) =< 255)). + +-endif. diff --git a/src/lwes_sup.erl b/src/lwes_sup.erl new file mode 100644 index 0000000..e5b54fa --- /dev/null +++ b/src/lwes_sup.erl @@ -0,0 +1,57 @@ +-module (lwes_sup). + +-behaviour (supervisor). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([start_link/0]). + +%% supervisor callbacks +-export([init/1]). + +%%==================================================================== +%% API functions +%%==================================================================== +%% @spec start_link() -> ServerRet +%% @doc API for starting the supervisor. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%==================================================================== +%% supervisor callbacks +%%==================================================================== +%% @spec init([]) -> SupervisorTree +%% @doc supervisor callback. +init([]) -> + { ok, + { + { one_for_one, 10, 10 }, + [ + { lwes_channel_manager, % child spec id + { lwes_channel_manager, start_link, [] },% child function to call + permanent, % always restart + 2000, % time to wait for child stop + worker, % type of child + [ lwes_channel_manager ] % modules used by child + }, + { + lwes_channel_sup, % child spec id + { lwes_channel_sup, start_link, []}, % child function to call + permanent, % always restart + 2000, % time to wait for child stop + supervisor, % type of child + [lwes_channel_sup] % modules used by child + } + ] + } + }. + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +-endif. diff --git a/src/lwes_util.erl b/src/lwes_util.erl new file mode 100644 index 0000000..5bedb5c --- /dev/null +++ b/src/lwes_util.erl @@ -0,0 +1,38 @@ +-module (lwes_util). + +-include_lib ("lwes.hrl"). +-include_lib ("lwes_internal.hrl"). + +-ifdef(HAVE_EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +% API +-export ([normalize_ip/1]). + +%%==================================================================== +%% API functions +%%==================================================================== +normalize_ip (Ip) when ?is_ip_addr (Ip) -> + Ip; +normalize_ip (Ip) when is_list (Ip) -> + case inet_parse:address (Ip) of + {ok, {N1, N2, N3, N4}} -> {N1, N2, N3, N4}; + _ -> erlang:error(badarg) + end; +normalize_ip (_) -> + % essentially turns function_clause error into badarg + erlang:error (badarg). + +%%==================================================================== +%% Test functions +%%==================================================================== +-ifdef(EUNIT). + +normalize_ip_test () -> + ?assertEqual ({127,0,0,1}, normalize_ip ("127.0.0.1")), + ?assertEqual ({127,0,0,1}, normalize_ip ({127,0,0,1})), + ?assertError (badarg, normalize_ip ("655.0.0.1")), + ?assertError (badarg, normalize_ip ({655,0,0,1})). + +-endif. diff --git a/tests/Makefile.am.local b/tests/Makefile.am.local new file mode 100644 index 0000000..ba83a1b --- /dev/null +++ b/tests/Makefile.am.local @@ -0,0 +1,9 @@ +TESTS = \ + module-lwes_app \ + module-lwes_channel_manager \ + module-lwes \ + module-lwes_sup \ + module-lwes_channel \ + module-lwes_channel_sup \ + module-lwes_event \ + module-lwes_util -- 2.11.4.GIT