上班第一天,老板就甩来30G文件是种什么体验...

如果给你一个包含一亿行数据的超大文件,让你将数据转化导入生产数据库,你会如何操作?
上面的问题是接到的一个真实的业务需求,将一个老系统历史数据通过线下文件的方式迁移到新的生产系统 。
由于时间紧,而数据量又超大,所以设计的过程想到一下解决办法:
拆分文件
首先我们可以写个小程序,或者使用拆分命令 split 将这个超大文件拆分一个个小文件 。
-- 将一个大文件拆分成若干个小文件,每个文件 100000 行split -l 100000 largeFile.txt -d -a 4 smallFile_
【上班第一天,老板就甩来30G文件是种什么体验...】这里之所以选择先将大文件拆分,主要考虑到两个原因:
第一如果程序直接读取这个大文件,假设读取一半的时候,程序突然宕机,这样就会直接丢失文件读取的进度,又需要重新开头读取 。
而文件拆分之后,一旦小文件读取结束,我们可以将小文件移动一个指定文件夹 。
这样即使应用程序宕机重启,我们重新读取时,只需要读取剩余的文件 。
第二,一个文件,只能被一个应用程序读取,这样就限制了导入的速度 。
而文件拆分之后,我们可以采用多节点部署的方式,水平扩展 。每个节点读取一部分文件,这样就可以成倍的加快导入速度 。
多线程导入
当我们拆分完文件,接着我们就需要读取文件内容,进行导入 。
之前拆分的时候,设置每个小文件包含 10w 行的数据 。由于担心一下子将 10w 数据读取应用中,导致堆内存占用过高,引起频繁的 「Full GC」,所以下面采用流式读取的方式,一行一行的读取数据 。
当然了,如果拆分之后文件很小,或者说应用的堆内存设置很大,我们可以直接将文件加载到应用内存中处理 。这样相对来说简单一点 。
逐行读取的代码如下:
File file = ...try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {while (iterator.hasNext()) {String line=iterator.nextLine();convertToDB(line);}}
上面代码使用 -io 中的 类,这个类底层使用了读取文件内容 。它将其封装成迭代器模式,这样我们可以很方便的迭代读取 。
如果当前使用 JDK1.8,那么上述操作更加简单,我们可以直接使用 JDK 原生的类 Files将文件转成方式读取,代码如下:
Files.lines(Paths.get("文件路径"), Charset.defaultCharset()).forEach(line -> {convertToDB(line);});
其实仔细看下 Files#lines底层源码,其实原理跟上面的 类似,同样也是封装成迭代器模式 。
多线程的引入存在的问题
上述读取的代码写起来不难,但是存在效率问题,主要是因为只有单线程在导入,上一行数据导入完成之后,才能继续操作下一行 。
为了加快导入速度,那我们就多来几个线程,并发导入 。
多线程我们自然将会使用线程池的方式,相关代码改造如下:
File file = ...;ExecutorService executorService = new ThreadPoolExecutor(5,10,60,TimeUnit.MINUTES,// 文件数量,假设文件包含 10W 行new ArrayBlockingQueue<>(10*10000),// guava 提供new ThreadFactoryBuilder().setNameFormat("test-%d").build());try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {while (iterator.hasNext()) {String line = iterator.nextLine();executorService.submit(() -> {convertToDB(line);});}}
上述代码中,每读取到一行内容,就会直接交给线程池来执行 。
我们知道线程池原理如下:
如果核心线程数未满,将会直接创建线程执行任务 。如果核心线程数已满,将会把任务放入到队列中 。如果队列已满,将会再创建线程执行任务 。如果最大线程数已满,队列也已满,那么将会执行拒绝策略 。