您现在的位置是:首页 > 博客日记 > Java Java

hadoop MapReduct小案例

2019-09-18 22:17:54

JobSubmit.class

  1. package cn.reduce;
  2. import java.net.URI;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. public class JobSubmit {
  12. public static void main(String[] args) {
  13. try {
  14. //在代码中设置jvm系统参数,用于给job对象来获取访问hdfs的用户身份
  15. System.setProperty("HADOOP_USER_NAME", "root");
  16. Configuration conf = new Configuration();
  17. //1.设置job运行时要访问的默认文件系统
  18. conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  19. //2.设置jobi叫到哪去运行
  20. conf.set("mapreduce.framework.name", "yarn");
  21. //3.
  22. conf.set("yarn.resourcemanager.hostname", "hadoop01");
  23. //4.如果要从windows系统上运行这个job提交客户端程序,则需要加上这个跨平台提交的参数
  24. conf.set("mapreduce.app-submission.cross-platform", "true");
  25. Job job = Job.getInstance(conf);
  26. //设置jar 位置 在windows上运行 需要打jar包 放到E:/hadoop下的jar
  27. job.setJar("E:\\hadoop\\wc.jar");
  28. //1.封装参数 jar位置 在linux 上运行时需要开启
  29. // job.setJarByClass(JobSubmit.class);
  30. //2.封装参数:本次job所要调用的Mapper实现类 、Reducer实现类
  31. job.setMapperClass(WordcountMapper.class);
  32. job.setReducerClass(ReduceMapper.class);
  33. //3.封装参数:本次job所要调用的mapper实现类、Reducer实现类 的结果数据的key、value类型
  34. job.setMapOutputKeyClass(Text.class);
  35. job.setMapOutputValueClass(IntWritable.class);
  36. job.setOutputKeyClass(Text.class);
  37. job.setOutputValueClass(IntWritable.class);
  38. Path output = new Path("/wordcount/output");
  39. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"),conf,"root");
  40. if(fs.exists(output)){
  41. fs.delete(output, true);
  42. }
  43. //4.封装参数: 本次job要处理的输入数据集所在路径、最终结果集路径
  44. FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
  45. FileOutputFormat.setOutputPath(job, output);
  46. //5.封装参数:想要启动的reduce task 的数量
  47. job.setNumReduceTasks(2);
  48. //6.提交job给yarn
  49. boolean res = job.waitForCompletion(true);
  50. System.exit(res?0:1);
  51. }catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }

ReduceMapper.class

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. /**
  7. * Text, IntWritable 是你wordcountMapper 输出的 key value 类型
  8. * @author hgz
  9. *
  10. */
  11. public class ReduceMapper extends Reducer<Text, IntWritable, Text, IntWritable>{
  12. @Override
  13. protected void reduce(Text key, Iterable<IntWritable> values,
  14. Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  15. Iterator<IntWritable> iterator = values.iterator();
  16. int count =0;
  17. while (iterator.hasNext()) {
  18. IntWritable next = iterator.next();
  19. count +=next.get();
  20. context.write(key, new IntWritable(count));
  21. }
  22. }
  23. }

WordcountMapper .class

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. /**
  7. * KEYIN 是map task 读取到的数据的key的类型,是一行的起始偏移量 Long VALUEIN 是map task
  8. * 去读到的数据的value的类型,是一行的内容 String KEYOUT 是用户的自定义map方法要返回的结果key,value
  9. * 数据的key的类型,在wordcount逻辑中,我们需要返回单词String
  10. * VALYEOUT是用户自定义map方法要返回的结果key,value的value类型,在wordcount逻辑中,我们要返回的是整型
  11. *
  12. * 在mapreduce中,map产生的数据需要传输给reduce,需要实现mapreduce的 序列化和反序列化 Long LongWritable
  13. * String Text INTERGER INTWRITABLE FLOAT
  14. *
  15. * @author hgz
  16. *
  17. */
  18. public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  19. @Override
  20. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
  21. throws IOException, InterruptedException {
  22. String line =value.toString();
  23. String[] split = line.split(" ");
  24. for (String newKey : split) {
  25. context.write(new Text(newKey), new IntWritable(1));
  26. }
  27. }
  28. }


关注TinyMeng博客,更多精彩分享,敬请期待!