使用MapReduce对日志数据进行简单的清理和总结

algorain

使用MapReduce和spark对日志数据进行简单的清理和总结

首先使用MapReduce对日志进行分割,将time,ip,url提取出来,在用reduce进行一个整合,根据ip地址的出现次数,打印到hdfs中。在整合中我使用了bean结构来存储数据,bean继承了WritableComparable接口。

使用时先将BaiduLog和LogBean两个类导入项目,并配置相应的Maven依赖,然后导出项目的jar到虚拟机中,将日志文件上传到HDFS中,使用命令运行

hadoop jar rain-hadoop-1.0-SNAPSHOT.jar com.rain.mapreduce.BaiduLog /data/baidu.log /data/log/clean5

BaiduLog.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class BaiduLog {
public static class BaiduLogMapper extends Mapper<LongWritable,Text, Text, LogBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// super.map(key, value, context);
String log = value.toString();
String str = "(cn.baidu.core.inteceptor.LogInteceptor:55)";
if (log.indexOf(str)!=-1){
String[] log_arr = log.split(str);
String time = log_arr[0].substring(1, 10);
String[] log_arr2 = log_arr[1].split("\t");
String ip = log_arr2[1];
String url = log_arr2[2];
if (url.equals("null")){
url = log_arr2[3];
}
LogBean logbean = new LogBean(time,ip,url);
context.write(new Text(ip),logbean);
}
}
}
public static class BaiduLogReducer extends Reducer<Text,LogBean,IntWritable,Text>{

@Override
protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException {
// super.reduce(key, values, context);
int sum = 0;

StringBuffer str = new StringBuffer();
int flag = 0;
for (LogBean logbean:values){
sum++;
if (flag==0){
str.append(logbean.toString());
flag = 1;
}
}
context.write(new IntWritable(sum),new Text(str.toString()));

}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "avg");

job.setJarByClass(BaiduLog.class);
job.setMapperClass(BaiduLog.BaiduLogMapper.class);
job.setReducerClass(BaiduLog.BaiduLogReducer.class);

// job.setCombinerClass(BaiduLog.BaiduLogReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LogBean.class);

FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}

}

LogBean.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class LogBean implements WritableComparable<LogBean> {
private String time;
private String ip;
private String url;

public LogBean() {
super();
}

public LogBean(String time, String ip, String url) {
this.time = time;
this.ip = ip;
this.url = url;
}

@Override
public String toString() {
return "LogBean{" +
"time='" + time + ' ' +
", ip='" + ip + ' ' +
", url='" + url + ' ' +
'}';
}

public String getTime() {
return time;
}

public void setTime(String time) {
this.time = time;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}


@Override
public int compareTo(LogBean o) {
return 0;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(time);
out.writeUTF(ip);
out.writeUTF(url);

}

@Override
public void readFields(DataInput in) throws IOException {
time = in.readUTF();
ip = in.readUTF();
url = in.readUTF();
}
}


  • Title: 使用MapReduce对日志数据进行简单的清理和总结
  • Author: algorain
  • Created at: 2018-09-19 09:41:54
  • Updated at: 2023-05-14 21:39:50
  • Link: http://www.rain1024.com/2018/09/19/article134/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments