merged sequence file processing into the exisitng classes
[lwes-journaller-java.git] / src / main / java / org / lwes / journaller / CountDeJournaller.java
blob8ffa32ac93d334ad80bb028934aea3fbe2e4ea6f
1 package org.lwes.journaller;
2 /**
3 * @author fmaritato
4 */
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.BytesWritable;
12 import org.apache.hadoop.io.NullWritable;
13 import org.apache.hadoop.io.SequenceFile;
14 import org.kohsuke.args4j.CmdLineException;
15 import org.lwes.db.EventTemplateDB;
16 import org.lwes.journaller.util.EventHandlerUtil;
17 import org.lwes.serializer.DeserializerState;
19 import java.io.DataInputStream;
20 import java.io.EOFException;
21 import java.io.FileInputStream;
22 import java.io.IOException;
23 import java.util.zip.GZIPInputStream;
25 public class CountDeJournaller extends DeJournaller {
27 private static final transient Log log = LogFactory.getLog(CountDeJournaller.class);
29 private int counter = 0;
31 public void run() {
32 if (fileName == null || "".equals(fileName)) {
33 System.err.println("File name was not specified");
34 return;
37 EventTemplateDB evtTemplate = new EventTemplateDB();
38 evtTemplate.initialize();
40 DeserializerState state = new DeserializerState();
42 DataInputStream in = null;
43 try {
44 if (sequence) {
45 processSequenceFile(fileName);
47 else {
48 if (gzipped) {
49 in = new DataInputStream(new GZIPInputStream(new FileInputStream(fileName)));
51 else {
52 in = new DataInputStream(new FileInputStream(fileName));
54 byte[] bytes;
55 while ((bytes = EventHandlerUtil.readEvent(in, state)) != null) {
56 state.reset();
57 handleEvent(bytes);
61 catch (EOFException e) {
62 // this is normal. Just catch and ignore.
64 catch (Exception e) {
65 e.printStackTrace();
67 finally {
68 if (in != null) {
69 try {
70 in.close();
72 catch (IOException e) {
73 e.printStackTrace();
76 done();
78 System.out.println("Found: " + counter + " events.");
81 protected void processSequenceFile(String file) {
83 Configuration conf = new Configuration();
84 FileSystem fs = null;
85 SequenceFile.Reader reader = null;
86 try {
87 if (log.isDebugEnabled()) {
88 log.debug("Opening file: " + getFileName());
90 fs = FileSystem.get(conf);
91 Path p = new Path(file);
92 reader = new SequenceFile.Reader(fs, p, conf);
94 BytesWritable key = (BytesWritable) reader.getKeyClass().newInstance();
95 NullWritable value = NullWritable.get();
97 while (reader.next(key, value)) {
98 handleEvent(key.getBytes());
101 catch (InstantiationException e) {
102 log.error(e.getMessage(), e);
104 catch (IllegalAccessException e) {
105 log.error(e.getMessage(), e);
107 catch (IOException e) {
108 log.error(e.getMessage(), e);
112 public void handleEvent(byte[] event) {
113 counter++;
116 public static void main(String[] args) {
117 CountDeJournaller dj = new CountDeJournaller();
118 try {
119 dj.parseArguments(args);
121 catch (CmdLineException e) {
122 e.printStackTrace();
124 dj.run();