1 package org
.lwes
.journaller
;
3 * A Class that will "dejournal" files written to by the GZIPEventHandler. If you want to
4 * do something other than write the events to stdout, subclass this and override handleEvent.
10 import org
.apache
.commons
.logging
.Log
;
11 import org
.apache
.commons
.logging
.LogFactory
;
12 import org
.apache
.hadoop
.conf
.Configuration
;
13 import org
.apache
.hadoop
.fs
.FileSystem
;
14 import org
.apache
.hadoop
.fs
.Path
;
15 import org
.apache
.hadoop
.io
.BytesWritable
;
16 import org
.apache
.hadoop
.io
.SequenceFile
;
17 import org
.kohsuke
.args4j
.CmdLineException
;
18 import org
.kohsuke
.args4j
.CmdLineParser
;
19 import org
.kohsuke
.args4j
.Option
;
20 import org
.lwes
.Event
;
21 import org
.lwes
.EventSystemException
;
22 import org
.lwes
.db
.EventTemplateDB
;
23 import org
.lwes
.journaller
.util
.EventHandlerUtil
;
24 import org
.lwes
.serializer
.DeserializerState
;
26 import java
.io
.ByteArrayInputStream
;
27 import java
.io
.DataInputStream
;
28 import java
.io
.EOFException
;
30 import java
.io
.FileInputStream
;
31 import java
.io
.IOException
;
32 import java
.util
.zip
.GZIPInputStream
;
34 public class DeJournaller
implements Runnable
, JournallerConstants
{
35 private static transient Log log
= LogFactory
.getLog(DeJournaller
.class);
37 @Option(name
= "--seq", aliases
= "--sequence")
38 protected boolean sequence
= false;
40 @Option(name
= "-g", aliases
= "--gzipped")
41 protected boolean gzipped
;
43 @Option(name
= "-f", aliases
= "--file")
44 protected String fileName
;
46 @Option(name
= "-e", aliases
= "--esf-file")
47 protected String esfFile
;
49 @Option(name
= "-v", aliases
= "validate")
50 protected boolean validate
= false;
52 public DeJournaller() {
57 if (fileName
== null || "".equals(fileName
)) {
58 log
.error("File name was not specified");
62 EventTemplateDB evtTemplate
= new EventTemplateDB();
63 DeserializerState state
= new DeserializerState();
65 if (getEsfFile() != null) {
66 evtTemplate
.setESFFile(new File(getEsfFile()));
68 log
.debug("esf: " + evtTemplate
.getESFFile());
69 log
.debug("validate: " + validate
);
70 evtTemplate
.initialize();
72 DataInputStream in
= null;
74 log
.debug("Opening file: " + fileName
);
76 processSequenceFile(fileName
);
80 in
= new DataInputStream(new GZIPInputStream(new FileInputStream(fileName
)));
83 in
= new DataInputStream(new FileInputStream(fileName
));
86 while ((evt
= EventHandlerUtil
.readEvent(in
, state
, evtTemplate
, validate
)) != null) {
92 catch (EOFException e
) {
93 // this is normal. Just catch and ignore.
96 log
.error(e
.getMessage(), e
);
103 catch (IOException e
) {
104 log
.error(e
.getMessage(), e
);
111 protected void processSequenceFile(String file
) {
112 Configuration conf
= new Configuration();
113 FileSystem fs
= null;
114 SequenceFile
.Reader reader
= null;
116 if (log
.isDebugEnabled()) {
117 log
.debug("Opening file: " + file
);
119 fs
= FileSystem
.get(conf
);
120 Path p
= new Path(file
);
121 reader
= new SequenceFile
.Reader(fs
, p
, conf
);
123 BytesWritable key
= (BytesWritable
) reader
.getKeyClass().newInstance();
124 BytesWritable value
= (BytesWritable
) reader
.getKeyClass().newInstance();
126 EventTemplateDB templ
= new EventTemplateDB();
127 DeserializerState state
= new DeserializerState();
128 while (reader
.next(key
, value
)) {
129 byte[] evtBytes
= new byte[key
.getLength()+value
.getLength()];
130 System
.arraycopy(key
.getBytes(), 0,
133 System
.arraycopy(value
.getBytes(), 0,
135 key
.getLength(), value
.getLength());
136 Event evt
= EventHandlerUtil
.readEvent
137 (new DataInputStream(new ByteArrayInputStream(evtBytes
)),
140 if (log
.isDebugEnabled()) {
141 log
.debug("read a k/v: "+evt
.toOneLineString());
146 catch (IOException e
) {
147 log
.error(e
.getMessage(), e
);
149 catch (InstantiationException e
) {
150 log
.error(e
.getMessage(), e
);
152 catch (IllegalAccessException e
) {
153 log
.error(e
.getMessage(), e
);
155 catch (EventSystemException e
) {
156 log
.error(e
.getMessage(), e
);
159 if (reader
!= null) {
163 catch (IOException e
) {
164 log
.error(e
.getMessage(), e
);
171 * This method is called after all events in the file have been processed.
174 if (log
.isDebugEnabled()) {
180 * Subclass and override this method if you want to do something other than print the
181 * events to the logging system. For example, you may want to write these events to a
186 public void handleEvent(Event event
) {
187 System
.out
.println(event
.toOneLineString());
190 protected void parseArguments(String
[] args
) throws CmdLineException
{
191 CmdLineParser parser
= new CmdLineParser(this);
192 parser
.parseArgument(args
);
195 public static void main(String
[] args
) {
196 DeJournaller dj
= new DeJournaller();
198 dj
.parseArguments(args
);
200 catch (CmdLineException e
) {
201 log
.error(e
.getMessage(), e
);
206 public String
getFileName() {
210 public void setFileName(String fileName
) {
211 this.fileName
= fileName
;
214 public String
getEsfFile() {
218 public void setEsfFile(String esfFile
) {
219 this.esfFile
= esfFile
;
222 public boolean isGzipped() {
226 public void setGzipped(boolean gzipped
) {
227 this.gzipped
= gzipped
;
230 public boolean isValidate() {
234 public void setValidate(boolean validate
) {
235 this.validate
= validate
;