Merge pull request #14 from lwes/pluggable-emission
[lwes-erlang/github-mirror.git] / src / lwes_journaller.erl
blob4e89eac191e39092f58d82ea41f7382a4583d2b1
2 % This module implements an lwes journaller.
4 % configuration is as follows
6 % [ { root, "." }, % journal root
7 % { name, "all_events.log" }, % journal name
8 % { interval, <rotation_interval> }, % interval for jouirnal file rotation
9 % ]
11 -module (lwes_journaller).
13 -behaviour (gen_server).
15 -include_lib ("lwes_internal.hrl").
17 %% API
18 -export ([ start_link/1,
19 process_event/2,
20 rotate/1,
21 format_header/5 ]).
23 %% gen_server callbacks
24 -export ([ init/1,
25 handle_call/3,
26 handle_cast/2,
27 handle_info/2,
28 terminate/2,
29 code_change/3
30 ]).
32 -record (state, {
33 journal_root,
34 journal_file_name,
35 journal_file_ext,
36 journal_current,
37 journal_last_rotate,
38 timer
39 }).
41 %%====================================================================
42 %% API
43 %%====================================================================
44 start_link (Config) ->
45 gen_server:start_link ( ?MODULE, [Config], []).
47 process_event (Event, Pid) ->
48 gen_server:cast (Pid, {process, Event}),
49 Pid.
51 rotate (Pid) ->
52 gen_server:cast (Pid, {rotate}).
54 %%====================================================================
55 %% gen_server callbacks
56 %%====================================================================
57 init ([Config]) ->
58 % get appication variables
59 Root = proplists:get_value (root, Config, "."),
60 Name = proplists:get_value (name, Config, "all_events.log"),
61 Interval = proplists:get_value (interval, Config, 60),
62 Ext = "gz",
64 % I want terminate to be called
65 process_flag (trap_exit, true),
67 % open journal file
68 { ok, File } = open (Root, Name, Ext),
70 % setup rotation of journal
71 { ok, TRef } =
72 timer:apply_interval (Interval * 1000, ?MODULE, rotate, [self()]),
74 { ok, #state {
75 journal_root = Root,
76 journal_file_name = Name,
77 journal_file_ext = Ext,
78 journal_current = File,
79 journal_last_rotate = seconds_since_epoch (),
80 timer = TRef
84 handle_call (Request, From, State) ->
85 error_logger:warning_msg ("Unrecognized call ~p from ~p~n",[Request, From]),
86 { reply, ok, State }.
88 format_header (EventSize, MillisTimestamp, Ip = {Ip1,Ip2,Ip3,Ip4}, Port, SiteId)
89 when ?is_uint16(EventSize), ?is_uint64(MillisTimestamp), ?is_ip_addr (Ip),
90 ?is_uint16(Port), ?is_uint16(SiteId) ->
91 <<EventSize:16/integer-unsigned-big, % 2 bytes
92 MillisTimestamp:64/integer-unsigned-big, % 8 bytes
93 Ip4:8/integer-unsigned-big, % 1 byte
94 Ip3:8/integer-unsigned-big, % 1 byte
95 Ip2:8/integer-unsigned-big, % 1 byte
96 Ip1:8/integer-unsigned-big, % 1 byte
97 Port:16/integer-unsigned-big, % 2 bytes
98 SiteId:16/integer-unsigned-big, % 2 bytes
99 0:32/integer-signed-big % 4 bytes
100 >>. % 22 bytes total
102 handle_cast ( {process, {udp, _, Ip, Port, B}},
103 State = #state { journal_current = Journal }) ->
104 S = byte_size (B),
105 M = milliseconds_since_epoch (),
106 SiteId = 1,
107 ok = file:write ( Journal,
108 [ format_header(S, M, Ip, Port, SiteId),
111 { noreply, State };
112 handle_cast ( {rotate}, State = #state {
113 journal_root = Root,
114 journal_file_name = Name,
115 journal_file_ext = Ext,
116 journal_current = File,
117 journal_last_rotate = LastRotate
118 }) ->
119 file:close (File),
120 rename (Root, Name, Ext, LastRotate),
121 {ok, NewFile} = open (Root, Name, Ext),
122 { noreply, State#state { journal_current = NewFile,
123 journal_last_rotate = seconds_since_epoch () }};
124 handle_cast (Request, State) ->
125 error_logger:warning_msg ("Unrecognized cast ~p~n",[Request]),
126 { noreply, State }.
128 handle_info (Request, State) ->
129 error_logger:warning_msg ("Unrecognized info ~p~n",[Request]),
130 {noreply, State}.
132 terminate (_Reason, #state {
133 journal_root = Root,
134 journal_file_name = Name,
135 journal_file_ext = Ext,
136 journal_current = File,
137 journal_last_rotate = LastRotate
138 }) ->
139 file:close (File),
140 rename (Root, Name, Ext, LastRotate),
143 code_change (_OldVsn, State, _Extra) ->
144 {ok, State}.
146 %%====================================================================
147 %% Internal
148 %%====================================================================
149 open (Root, Name, Ext) ->
150 JournalFile = filename:join ([Root, string:join ([Name, Ext],".")]),
151 file:open (JournalFile, [ write, raw, compressed ]).
153 rename (Root, Name, Ext, LastRotate) ->
154 {{Year,Month,Day},{Hour,Minute,Second}} =
155 calendar:now_to_universal_time(os:timestamp()),
156 NewFile =
157 filename:join
158 ([Root,
159 io_lib:format
160 ("~s.~4.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B.~b.~b.~s",
161 [ Name, Year, Month, Day, Hour, Minute, Second, LastRotate,
162 seconds_since_epoch(), Ext])]),
163 CurrentFile = filename:join ([Root, string:join ([Name, Ext],".")]),
164 error_logger:info_msg("rename ~p -> ~p",[CurrentFile, NewFile]),
165 ok = file:rename (CurrentFile, NewFile).
167 milliseconds_since_epoch () ->
168 {Meg, Sec, Mic} = os:timestamp(),
169 trunc (Meg * 1000000000 + Sec * 1000 + Mic / 1000).
171 seconds_since_epoch () ->
172 {M, S, _ } = os:timestamp(),
173 M*1000000+S.
175 %%====================================================================
176 %% Test functions
177 %%====================================================================
178 -ifdef (TEST).
179 -include_lib ("eunit/include/eunit.hrl").
181 -endif.