Hadoop的整文件读取方法

这篇文章主要讲解了“Hadoop的整文件读取方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop的整文件读取方法”吧!

写Hadoop程序时,有时候需要读取整个文件,而不是分片读取,但默认的为分片读取,所以,只有编写自己的整文件读取类。

需要编写的有:

WholeInputFormat类,继承自FileInputFormat类

WholeRecordReader类,继承自RecordReader类

其中,用于读取的类是WholeRecordReader类。以下代码以Text为key值类型,BytesWritable为value的类型,因为大多数格式在hadoop中都没有相应的类型支持,比如jpg,sdf,png等等,在hadoop中都没有相应的类,但是都可以转换为byte[]字节流,然后在转化为BytesWritable类型,最后在Map或者Reduce再转换成java中的相应类型。

代码如下,解释见 :

import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeInputFormat extends FileInputFormat<Text, BytesWritable> {     @Override     public RecordReader<Text, BytesWritable> createRecordReader (InputSplit split, TaskAttemptContext context)       throws IOException,InterruptedException       {         return new WholeRecordReader();      }     @Override     //判断是否分片,false表示不分片,true表示分片。      //其实这个不写也可以,因为在WholeRecordReader中一次性全部读完      protected boolean isSplitable(JobContext context,Path file)      {          return false;      } }

下面是WholeRecordReader类:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<Text,BytesWritable> {      //Hadoop中处理文件的类      private FileSplit fileSplit;      private FSDataInputStream in = null;        private BytesWritable value = null;      private Text key = null;            //用于判断文件是否读取完成      //也就是因为这个,所以WholeInputFormat中的isSplitable方法可以不用写      private boolean processed = false;        @Override      public void close() throws IOException       {         //do nothing      }      @Override      public Text getCurrentKey() throws IOException, InterruptedException       {           return this.key;      }        @Override      public BytesWritable getCurrentValue() throws IOException,InterruptedException       {           return this.value;      }        @Override      public float getProgress() throws IOException, InterruptedException       {           return processed ? fileSplit.getLength() : 0;      }         @Override      public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException       {           //打开一个文件输入流           fileSplit = (FileSplit)split;           Configuration job = context.getConfiguration();           Path file = fileSplit.getPath();           FileSystem temp = file.getFileSystem(job);           in = temp.open(file);      }      @Override      public boolean nextKeyValue() throws IOException, InterruptedException      {           if(key == null)           {               key = new Text();           }              if(value == null)           {               value = new BytesWritable();           }              if(!processed)           {               //申请一个字节数组保存将从文件中读取的内容               byte[] content = new byte[(int)fileSplit.getLength()];               Path file = fileSplit.getPath();               //以文件的名字作为传递给Map函数的key值,可以自行设置               key.set(file.getName());                        try{                //读取文件中的内容                IOUtils.readFully(in,content,0,content.length);                //将value的值设置为byte[]中的值                value.set(new BytesWritable(content));               }catch(IOException e)               {                    e.printStackTrace();               }finally{                //关闭输入流                IOUtils.closeStream(in);               }                        //将processed设置成true,表示读取文件完成,以后不再读取               processed = true;               return true;           }                    return false;      } }

当把这些写好后,在main()函数或者run()函数里面将job的输入格式设置成WholeInputFormat,如下:

job.setInputFormatClass(WholeInputFormat.class);

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。