使用Java处理大文件

标签: Web开发 | 发表时间:2014-04-07 00:00 | 作者:踏雁寻花
出处:http://www.importnew.com

我最近要处理一套存储历史实时数据的大文件 fx market data,我很快便意识到,使用传统的InputStream不能够将它们读取到内存,因为每一个文件都超过了4G。甚至编辑器都不能够打开这些文件。

在这种特殊情况下,我可以写一个简单的bash脚本将这些文件分成更小的文件块,然后再读取它。但是我不想这样做,因为二进制格式会使这个方法失效。

处理这个问题的方式通常就是使用内存映射文件递增地处理区域的数据。关于内存映射文件的一个好处就是它们不会使用虚拟内存和换页空间,因为它们是从磁盘上的文件返回来的数据。

很好,让我们来看一看这些文件和额外的一些数据。似乎它们使用逗号分隔的字段包含ASCII文本行。

格式: [currency-pair],[timestamp],[bid-price],[ask-price]

例子: EUR/USD,20120102 00:01:30.420,1.29451,1.2949

我可以为这种格式去写一个程序,但是,读取文件和解析文件是无关的概念。让我们退一步来想一个通用的设计,当在将来面临相似的问题时这个设计可以被重复利用。

这个问题可以归结为递增地解码一个已经在无限长的数组中被编码的记录,并且没有耗尽内存。实际上,以逗号分割的示例格式编码与通常的解决方案是不相关的。所以,很明显需要一个解码器来处理不同的格式。

再来看,知道整个文件处理完成,每一条记录都不能被解析并保存在内存中,所以我们需要一种方式来转移记录,在它们成为垃圾被回收之前可以被写到其他地方,例如磁盘或者网络。

迭代器是处理这个需求的很好的抽象,因为它们就像游标一样,可以正确的指向某个位置。每一次迭代都可以转发文件指针,并且可以让我们使用数据做其他的事情。

首先来写一个Decoder 接口,递增地把对象从 MappedByteBuffer中解码,如果buffer中没有对象,则返回null。

    public interface Decoder<T> {
        public T decode(ByteBuffer buffer);
    }

然后让FileReader 实现Iterable接口。每一个迭代器将会处理下一个4096字节的数据,并使用Decoder把它们解码成一个对象的List集合。注意,FileReader 接收文件(files)的list对象,这样是很好的,因为它可以遍历数据,并且不需要考虑聚合的问题。顺便说一下,4096个字节块对于大文件来说是非常小的。

    public class FileReader implements Iterable<List<T>> {
      private static final long CHUNK_SIZE = 4096;
      private final Decoder<T> decoder;
      private Iterator<File> files;
    
      private FileReader(Decoder<T> decoder, File... files) {
        this(decoder, Arrays.asList(files));
      }
      private FileReader(Decoder<T> decoder, List<File> files) {
        this.files = files.iterator();
        this.decoder = decoder;
      }
      public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {
        return new FileReader<T>(decoder, files);
      }
    
      public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {
        return new FileReader<T>(decoder, files);
      }
      @Override
      public Iterator<List<T>> iterator() {
        return new Iterator<List<T>>() {
          private List<T> entries;
          private long chunkPos = 0;
          private MappedByteBuffer buffer;
          private FileChannel channel;
          @Override
          public boolean hasNext() {
            if (buffer == null || !buffer.hasRemaining()) {
              buffer = nextBuffer(chunkPos);
              if (buffer == null) {
                return false;
              }
            }
            T result = null;
            while ((result = decoder.decode(buffer)) != null) {
              if (entries == null) {
                entries = new ArrayList<T>();
              }
              entries.add(result);
            }
            // set next MappedByteBuffer chunk
            chunkPos += buffer.position();
            buffer = null;
            if (entries != null) {
              return true;
            } else {
              Closeables.closeQuietly(channel);
              return false;
            }
          }
    
          private MappedByteBuffer nextBuffer(long position) {
            try {
              if (channel == null || channel.size() == position) {
                if (channel != null) {
                  Closeables.closeQuietly(channel);
                  channel = null;
                }
                if (files.hasNext()) {
                  File file = files.next();
                  channel = new RandomAccessFile(file, "r").getChannel();
                  chunkPos = 0;
                  position = 0;
                } else {
                  return null;
                }
              }
              long chunkSize = CHUNK_SIZE;
              if (channel.size() - position < chunkSize) {
                chunkSize = channel.size() - position;
              }
               return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);
            } catch (IOException e) {
               Closeables.closeQuietly(channel);
               throw new RuntimeException(e);
            }
          }
    
          @Override
          public List<T> next() {
            List<T> res = entries;
            entries = null;
            return res;
          }
    
          @Override
          public void remove() {
            throw new UnsupportedOperationException();
          }
        };
      }
    }

下一个任务就是写一个Decoder 。针对逗号分隔的任何文本格式,编写一个TextRowDecoder 类。接收的参数是每行字段的数量和一个字段分隔符,返回byte的二维数组。TextRowDecoder 可以被操作不同字符集的特定格式解码器重复利用。

public class TextRowDecoder implements Decoder<byte[][]> {
  private static final byte LF = 10;
  private final int numFields;
  private final byte delimiter;
  public TextRowDecoder(int numFields, byte delimiter) {
   this.numFields = numFields;
   this.delimiter = delimiter;
  }
  @Override
  public byte[][] decode(ByteBuffer buffer) {
    int lineStartPos = buffer.position();
    int limit = buffer.limit();
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == LF) { // reached line feed so parse line
        int lineEndPos = buffer.position();
        // set positions for one row duplication
        if (buffer.limit() < lineEndPos + 1) {
          buffer.position(lineStartPos).limit(lineEndPos);
        } else {
          buffer.position(lineStartPos).limit(lineEndPos + 1);
        }
        byte[][] entry = parseRow(buffer.duplicate());
        if (entry != null) {
          // reset main buffer
          buffer.position(lineEndPos);
          buffer.limit(limit);
          // set start after LF
          lineStartPos = lineEndPos;
        }
        return entry;
      }
    }
    buffer.position(lineStartPos);
    return null;
  }
 
  public byte[][] parseRow(ByteBuffer buffer) {
    int fieldStartPos = buffer.position();
    int fieldEndPos = 0;
    int fieldNumber = 0;
    byte[][] fields = new byte[numFields][];
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == delimiter || b == LF) {
        fieldEndPos = buffer.position();
        // save limit
        int limit = buffer.limit();
        // set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);
        fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);
        fieldNumber++;
        // reset main buffer
        buffer.position(fieldEndPos);
        buffer.limit(limit);
        // set start after LF
        fieldStartPos = fieldEndPos;
      }
      if (fieldNumber == numFields) {
        return fields;
      }
    }
    return null;
  }
 
  private byte[] parseField(ByteBuffer buffer, int pos, int length) {
    byte[] field = new byte[length];
    for (int i = 0; i < field.length; i++) {
      field[i] = buffer.get();
    }
    return field;
  }
}

这是文件被处理的过程。每一个List包含的元素都从一个单独的buffer中解码,每一个元素都是被TextRowDecoder定义的byte二维数组。

TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {
  // do something with each chunk
}

我们可以在这里打住,不过还有额外的需求。每一行都包含一个时间戳,每一批都必须分组,使用时间段来代替buffers,如按照天分组、或者按照小时分组。我还想要遍历每一批的数据,因此,第一反应就是,为FileReader创建一个Iterable包装器,实现它的行为。一个额外的细节,每一个元素必须通过实现Timestamped接口(这里没有显示)提供时间戳到PeriodEntries。

         
      public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {
      private final Iterator<List<T extends Timestamped>> entriesIt;
      private final long interval;
      private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {
        this.entriesIt = entriesIt.iterator();
        this.interval = interval;
      }
    
      public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {
       return new PeriodEntries<T>(entriesIt, interval);
      }
    
      @Override
      public Iterator<List<T extends Timestamped>> iterator() {
        return new Iterator<List<T>>() {
          private Queue<List<T>> queue = new LinkedList<List<T>>();
          private long previous;
          private Iterator<T> entryIt;
    
          @Override
          public boolean hasNext() {
            if (!advanceEntries()) {
              return false;
            }
            T entry =  entryIt.next();
            long time = normalizeInterval(entry);
            if (previous == 0) {
              previous = time;
            }
            if (queue.peek() == null) {
              List<T> group = new ArrayList<T>();
              queue.add(group);
            }
            while (previous == time) {
              queue.peek().add(entry);
              if (!advanceEntries()) {
                break;
              }
              entry = entryIt.next();
              time = normalizeInterval(entry);
            }
            previous = time;
            List<T> result = queue.peek();
            if (result == null || result.isEmpty()) {
              return false;
            }
            return true;
          }
    
          private boolean advanceEntries() {
            // if there are no rows left
            if (entryIt == null || !entryIt.hasNext()) {
              // try get more rows if possible
              if (entriesIt.hasNext()) {
                entryIt = entriesIt.next().iterator();
                return true;
              } else {
                // no more rows
                return false;
              }
            }
            return true;
          }
    
          private long normalizeInterval(Timestamped entry) {
            long time = entry.getTime();
            int utcOffset = TimeZone.getDefault().getOffset(time);
            long utcTime = time + utcOffset;
            long elapsed = utcTime % interval;
            return time - elapsed;
          }
          @Override
          public List<T> next() {
            return queue.poll();
          }
          @Override
          public void remove() {
            throw new UnsupportedOperationException();
          }
       };
      }
    }

最后的处理代码通过引入这个函数并无太大变动,只有一个干净的且紧密的循环,不必关心文件、缓冲区、时间周期的分组元素。PeriodEntries也是足够的灵活管理任何时长的时间。

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);
 
for (List<TrueFxData> entries : periods) {
   // data for each day
   for (TrueFxData entry : entries) {
     // process each entry
   }
}

你也许意识到了,使用集合不可能解决这样的问题;选择迭代器是一个关键的设计决策,能够解析兆字节的数组,且不会消耗过多的空间。

可能感兴趣的文章

相关 [java 理大 文件] 推荐:

使用Java处理大文件

- - ImportNew
我最近要处理一套存储历史实时数据的大文件 fx market data,我很快便意识到,使用传统的InputStream不能够将它们读取到内存,因为每一个文件都超过了4G. 甚至编辑器都不能够打开这些文件. 在这种特殊情况下,我可以写一个简单的bash脚本将这些文件分成更小的文件块,然后再读取它.

Java的 class文件结构

- - Java - 编程语言 - ITeye博客
Java-class文件结构.        我们都知道我们现在写的源代码计算机是不认识的,我们需要根据指定的编译器进行编译-连接-执行,这样才是我们想要的结果,所以计算机只能认识0或者1 ,那么如何与操作系统或者机器指令无关的程序能执行,那么在操作系统以及机器指令之上的那就是虚拟机了,这样我们编写的代码不再是最终形成二进制本地指令代码,而是一种在操作系统和机器指令之上的虚拟机规定的文件格式.

Java写xml文件的编码问题

- - CSDN博客推荐文章
最近项目中需要生成xml格式的配置文件,用的是 javax.xml.transform.Transformer 类中提供的transform方法,在本地执行没问题,但是一旦把工程部署到Tomcat下运行,就会出现中文乱码的现象,纠结了许久,在大神的帮助下终于解决了. 有篇文章其实已经讲的很清楚了,链接如下:.

JAVA 动态(手动)加载jar文件

- - CSDN博客编程语言推荐文章
//filePath 是jar的绝对路径. //里面是一个url的数组,可以同时加载多个. //根据类名加载指定类,例:. //通过反射调用类中的方法,例如调用addFile方法,有1个String参数和一个int参数:. 如果有返回值,则直接返回需要的值,例:. 作者:kingboy123000 发表于2013-10-23 16:12:30 原文链接.

Java读取远程主机文件

- - ITeye博客
//Java利用第三方jar包(ganymed-ssh2-build210.jar)读取远程主机文件. String idxMasterPath = useHome+"ADR/cb/data/master/";   //主侧IDX文件路径. String idxSlavePath = useHome+"ADR/cb/data/slave/";     //备侧文件路径.

分析一个Java Class文件

- - CSDN博客推荐文章
Java源码文件TestClass.java:. 展示这个Class文件的16进制内容:. 00 00 00 34 : 版本号是1.8.0;. 00 16: 说明常量池有21个常量,1-21, index留做他用;接下来就是分别这21个常量的描述:. 07/00 02 :CONSTANT_Class_info 常量,类名索引是该常量池的第2项;.

Java根据文件头获取文件类型

- - BlogJava_首页
文件头是位于文件开头的一段承担一定任务的数据,一般都在开头的部分. 头文件作为一种包含功能函数、数据接口声明的载体文件,用于保存程序的声明(declaration),而定义文件用于保存程序的实现 (implementation).      为了解决在用户上传文件的时候在服务器端判断文件类型的问题,故用获取文件头的方式,直接读取文件的前几个字节,来判断上传文件是否符合格式.

java,根据头文件判断文件类型

- - Java - 编程语言 - ITeye博客
web判断文件的格式的时候,我们一般都是通过扩展名来判断,这个有点不太靠谱和安全. 下面是利用头文件来判断的代码:. * 根据文件流读取图片文件真实类型.  网上搜索了一些头文件的格式,但是强调一下,txt文件没有固定的头名,需要另外考虑,供参考:. JPEG (jpg),文件头:FFD8FF  .

处理XML文件的三种方式(Java)

- - CSDN博客推荐文章
    DOM(Document Object Model),文档对象模型,DOM可以以一种独立于平台和语言的方式访问和修改一个文档的内容和结构. 换句话说,这是表示和处理一个HTML或XML文档的常用方法. 有一点很重要,DOM的设计是以对象管理组织(OMG)的规约为基的,因此可以用于任何编程语言.

java实现把文件上传至ftp服务器

- - CSDN博客互联网推荐文章
用java实现ftp文件上传. 我使用的是commons-net-1.4.1.zip. 其中包含了众多的java网络编程的工具包. 1 把commons-net-1.4.1.jar包加载到项目工程中去. * Description: 向FTP服务器上传文件. * @param url FTP服务器hostname.