added 1.0.0 changes
[lwes-journaller-java.git] / src / main / java / org / lwes / journaller / DeJournaller.java
blob4d24fab26a4699829a4c8e731d18ec4cf6177eca
1 package org.lwes.journaller;
2 /**
3 * A Class that will "dejournal" files written to by the GZIPEventHandler. If you want to
4 * do something other than write the events to stdout, subclass this and override handleEvent.
6 * User: fmaritato
7 * Date: Apr 16, 2009
8 */
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12 import org.apache.hadoop.conf.Configuration;
13 import org.apache.hadoop.fs.FileSystem;
14 import org.apache.hadoop.fs.Path;
15 import org.apache.hadoop.io.BytesWritable;
16 import org.apache.hadoop.io.SequenceFile;
17 import org.kohsuke.args4j.CmdLineException;
18 import org.kohsuke.args4j.CmdLineParser;
19 import org.kohsuke.args4j.Option;
20 import org.lwes.Event;
21 import org.lwes.EventSystemException;
22 import org.lwes.db.EventTemplateDB;
23 import org.lwes.journaller.util.EventHandlerUtil;
24 import org.lwes.serializer.DeserializerState;
26 import java.io.ByteArrayInputStream;
27 import java.io.DataInputStream;
28 import java.io.EOFException;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.IOException;
32 import java.util.zip.GZIPInputStream;
34 public class DeJournaller implements Runnable, JournallerConstants {
35 private static transient Log log = LogFactory.getLog(DeJournaller.class);
37 @Option(name = "--seq", aliases = "--sequence")
38 protected boolean sequence = false;
40 @Option(name = "-g", aliases = "--gzipped")
41 protected boolean gzipped;
43 @Option(name = "-f", aliases = "--file")
44 protected String fileName;
46 @Option(name = "-e", aliases = "--esf-file")
47 protected String esfFile;
49 @Option(name = "-v", aliases = "validate")
50 protected boolean validate = false;
52 public DeJournaller() {
55 public void run() {
57 if (fileName == null || "".equals(fileName)) {
58 log.error("File name was not specified");
59 return;
62 EventTemplateDB evtTemplate = new EventTemplateDB();
63 DeserializerState state = new DeserializerState();
65 if (getEsfFile() != null) {
66 evtTemplate.setESFFile(new File(getEsfFile()));
68 log.debug("esf: " + evtTemplate.getESFFile());
69 log.debug("validate: " + validate);
70 evtTemplate.initialize();
72 DataInputStream in = null;
73 try {
74 log.debug("Opening file: " + fileName);
75 if (sequence) {
76 processSequenceFile(fileName);
78 else {
79 if (gzipped) {
80 in = new DataInputStream(new GZIPInputStream(new FileInputStream(fileName)));
82 else {
83 in = new DataInputStream(new FileInputStream(fileName));
85 Event evt;
86 while ((evt = EventHandlerUtil.readEvent(in, state, evtTemplate, validate)) != null) {
87 state.reset();
88 handleEvent(evt);
92 catch (EOFException e) {
93 // this is normal. Just catch and ignore.
95 catch (Exception e) {
96 log.error(e.getMessage(), e);
98 finally {
99 if (in != null) {
100 try {
101 in.close();
103 catch (IOException e) {
104 log.error(e.getMessage(), e);
107 done();
111 protected void processSequenceFile(String file) {
112 Configuration conf = new Configuration();
113 FileSystem fs = null;
114 SequenceFile.Reader reader = null;
115 try {
116 if (log.isDebugEnabled()) {
117 log.debug("Opening file: " + file);
119 fs = FileSystem.get(conf);
120 Path p = new Path(file);
121 reader = new SequenceFile.Reader(fs, p, conf);
123 BytesWritable key = (BytesWritable) reader.getKeyClass().newInstance();
124 BytesWritable value = (BytesWritable) reader.getKeyClass().newInstance();
126 EventTemplateDB templ = new EventTemplateDB();
127 DeserializerState state = new DeserializerState();
128 while (reader.next(key, value)) {
129 byte[] evtBytes = new byte[key.getLength()+value.getLength()];
130 System.arraycopy(key.getBytes(), 0,
131 evtBytes, 0,
132 key.getLength());
133 System.arraycopy(value.getBytes(), 0,
134 evtBytes,
135 key.getLength(), value.getLength());
136 Event evt = EventHandlerUtil.readEvent
137 (new DataInputStream(new ByteArrayInputStream(evtBytes)),
138 state, templ);
139 state.reset();
140 if (log.isDebugEnabled()) {
141 log.debug("read a k/v: "+evt.toOneLineString());
143 handleEvent(evt);
146 catch (IOException e) {
147 log.error(e.getMessage(), e);
149 catch (InstantiationException e) {
150 log.error(e.getMessage(), e);
152 catch (IllegalAccessException e) {
153 log.error(e.getMessage(), e);
155 catch (EventSystemException e) {
156 log.error(e.getMessage(), e);
158 finally {
159 if (reader != null) {
160 try {
161 reader.close();
163 catch (IOException e) {
164 log.error(e.getMessage(), e);
171 * This method is called after all events in the file have been processed.
173 public void done() {
174 if (log.isDebugEnabled()) {
175 log.debug("done");
180 * Subclass and override this method if you want to do something other than print the
181 * events to the logging system. For example, you may want to write these events to a
182 * database.
184 * @param event
186 public void handleEvent(Event event) {
187 System.out.println(event.toOneLineString());
190 protected void parseArguments(String[] args) throws CmdLineException {
191 CmdLineParser parser = new CmdLineParser(this);
192 parser.parseArgument(args);
195 public static void main(String[] args) {
196 DeJournaller dj = new DeJournaller();
197 try {
198 dj.parseArguments(args);
200 catch (CmdLineException e) {
201 log.error(e.getMessage(), e);
203 dj.run();
206 public String getFileName() {
207 return fileName;
210 public void setFileName(String fileName) {
211 this.fileName = fileName;
214 public String getEsfFile() {
215 return esfFile;
218 public void setEsfFile(String esfFile) {
219 this.esfFile = esfFile;
222 public boolean isGzipped() {
223 return gzipped;
226 public void setGzipped(boolean gzipped) {
227 this.gzipped = gzipped;
230 public boolean isValidate() {
231 return validate;
234 public void setValidate(boolean validate) {
235 this.validate = validate;