大数据量txt文本数据去重不像想象中那么简单,因为涉及到内存溢出问题,所以不能直接在内存中进行循环比对处理。本分块去重代码的原理是:根据每行字符串的hashcode值范围将字符串分文件存放,因为hashcode不同,字符串就一定不同,所以经过hashcode分开后的文件之间不存在重复的字符串。最后将小文件再整合到同一个文件中即可。
package com.files;
import Java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PartFile {
// 内存监控
final static Runtime currRuntime = Runtime.getRuntime ();
// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
final static long MEMERY_LIMIT = 1024 * 1024 * 3;
// 内存限制,我内存最多容纳的文件大小
static final long FILE_LIMIT_SIZE = 1024*1024*20;
// 文件写入缓冲区 ,我默认1M
static final int CACHE_SIZE = 1024*1024;
// 默认文件后缀
static final String FILE_SUFFIX = ".txt";
// 临时分割的文件目录,可以删除~。~
static final String FILE_PREFIX = "test/";
// 汇总的文件名
static final String REQUST_FILE_NAME = "resultFile.txt";
// 存放大文件 引用,以及分割位置
static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
// 存放小文件的,驱除重复数据
static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
public static void main(String[] args) {
long begin = System.currentTimeMillis();
new PartFile().partFile(new File("bigData.txt"),
Integer.MAX_VALUE,Integer.MIN_VALUE);
long result = System.currentTimeMillis()-begin;
System.out.println("除去重复时间为:"+result +" 毫秒");
}
// 按hashCode 范围分割
public void partFile(File origFile,long maxNum,long minNum) {
String line = null;
long hashCode = 0;
long max_left_hashCode = 0;
long min_left_hashCode = 0;
long max_right_hashCode = 0;
long min_right_hashCode = 0;
BufferedWriter rightWriter = null;
BufferedWriter leftWriter = null;
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(origFile));
long midNum = (maxNum+minNum)/2;
// 以文件hashCode 范围作为子文件名
File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
ChildFile leftChild = new ChildFile(leftFile);
ChildFile rightChild = new ChildFile(rightFile);
// 字符串 组合写入也行
// StringBuilder leftStr = new StringBuilder(100000);
// StringBuilder rightStr = new StringBuilder(100000);
// hashCode 的范围作为分割线
while ((line = reader.readLine()) != null) {
hashCode = line.hashCode();
if (hashCode > midNum) {
if(max_right_hashCode < hashCode || max_right_hashCode == 0){
max_right_hashCode = hashCode;
}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
min_right_hashCode = hashCode;
}
// 按行写入缓存
writeToFile(rightWriter, line);
}else {
if(max_left_hashCode < hashCode || max_left_hashCode == 0){
max_left_hashCode = hashCode;
}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
min_left_hashCode = hashCode;
}
writeToFile(leftWriter, line);
}
}
// 保存子文件信息
leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
closeWriter(rightWriter);
closeWriter(leftWriter);
closeReader(reader);
// 删除原始文件,保留最原始的文件
if(!origFile.getName().equals("bigData.txt")){
origFile.delete();
}
// 分析子文件信息,是否写入或者迭代
analyseChildFile(rightChild, leftChild);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
// 分析子文件信息
public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
// 将分割后 还是大于内存的文件保存 继续分割
File rightFile = rightChild.getChildFile();
if(isSurpassFileSize(rightFile)){
bigChildFiles.add(rightChild);
}else if(rightFile.length()>0){
orderAndWriteToFiles(rightFile);
}
File leftFile = leftChild.getChildFile();
if(isSurpassFileSize(leftFile)){
bigChildFiles.add(leftChild);
}else if(leftFile.length()>0){
orderAndWriteToFiles(leftFile);
}
// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
if(bigChildFiles.size() > 0 ){
ChildFile e = bigChildFiles.get(bigChildFiles.size()-1);
bigChildFiles.remove(e);
// 迭代分割
partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
}
}
// 将小文件读到内存排序除重复
public void orderAndWriteToFiles(File file){
BufferedReader reader = null;
String line = null;
BufferedWriter totalWriter = null;
StringBuilder sb = new StringBuilder(1000000);
try {
totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
reader = new BufferedReader(new FileReader(file));
while((line = reader.readLine()) != null){
if(!fileLinesMap.containsKey(line)){
fileLinesMap.put(line, null);
sb.append(line+"\r\n");
//totalWriter.write(line+"\r\n");
}
}
totalWriter.write(sb.toString());
fileLinesMap.clear();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
closeReader(reader);
closeWriter(totalWriter);
// 删除子文件
file.delete();
}
}
// 判断该文件是否超过 内存限制
public boolean isSurpassFileSize(File file){
return FILE_LIMIT_SIZE < file.length();
}
// 将数据写入文件
public void writeToFile(BufferedWriter writer, String writeInfo) {
try {
writer.write(writeInfo+"\r\n");
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭流
public void closeReader(Reader reader) {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 关闭流
public void closeWriter(Writer writer) {
if (writer != null) {
try {
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 内部类,记录子文件信息
class ChildFile{
// 文件 和 内容 hash 分布
File childFile;
long maxHashCode;
long minHashCode;
public ChildFile(File childFile){
this.childFile = childFile;
}
public ChildFile(File childFile, long maxHashCode, long minHashCode) {
super();
this.childFile = childFile;
this.maxHashCode = maxHashCode;
this.minHashCode = minHashCode;
}
public File getChildFile() {
return childFile;
}
public void setChildFile(File childFile) {
this.childFile = childFile;
}
public long getMaxHashCode() {
return maxHashCode;
}
public void setMaxHashCode(long maxHashCode) {
this.maxHashCode = maxHashCode;
}
public long getMinHashCode() {
return minHashCode;
}
public void setMinHashCode(long minHashCode) {
this.minHashCode = minHashCode;
}
public void setHashCode(long minHashCode,long maxHashCode){
this.setMaxHashCode(maxHashCode);
this.setMinHashCode(minHashCode);
}
}
}
方法分析:
1.采用hashCode 范围迭代分割的方式,可以分割成内存可以容纳的小文件,然后完成功能
2.我们发现每次迭代,相当于重复读取里面的文件,然后再进行分割,这样浪费了很多时间,那么有没有更好的方式呢? 我们可以这样设计,假设我们知道文件的总大小,已经大概的行数,比如2G,1亿行,我们一开始就分配区间,在分配完全均匀的情况下,1亿行数据,最多占用1亿个空间,那么可以这样分配,用hashCode 的范围,也就是Integer的最大值和最小值进行模拟分配。分配范围 根据内存进行,比如:读取第一行的的hashCode 值为100,那么,我们可以分配到1-1000000,(这里以100W 为单位),也就是说只要hashCode 范围在这个区间的都分配到这里,同理,读到任何一个hashCode 值,除以单位(100W),就能找到你的区间,比如hasCode 是 2345678,那么就是200W-300W的区间。这里有些区间可能空的,就删除了,有些区间很多,就用上面的迭代,空间足够 就直接写入汇总文件。当然区间单位的颗粒度划分,根据内存和数据总量 自己弄,这样下来就会一次读取 ,就能尽量的分配均匀,就不会大量迭代读取浪费时间了。
3.我们发现分割的时候是直接写入,没有进行任何排序或者其他操作,如果我们在分割的时候保存一部分到集合内存,发现有重复的,就不写入分割文件了,如果写入量超过集合限制了,就清空集合,这样能保证单个小文件 一次达到除重复的效果,大文件部分除重复,减少子文件的数据量,如果重复数据较多,可以采用,极端的情况下完全不重复,那么集合会浪费你的空间,并且除重复的时候会浪费你的时间,这里自己对数据进行整体考虑。
4.这里内存的控制是我测试进行控制,用JDK 的方法进行内存监控,因为涉及到回收频率 以及时间上的问题,我没有动态的对集合进行监控,占了多少内存,还剩多少内存等等,可以尝试。