快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

java如何使用多线程读取超大文件

这篇文章主要介绍了java如何使用多线程读取超大文件,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

公司主营业务:网站设计、做网站、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。成都创新互联公司是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。成都创新互联公司推出寿县免费做网站回馈大家。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

(文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.Observable; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午3:12 * 读取文件 */public class ReadFile extends Observable {  private int bufSize = 1024; // 换行符 private byte key = "\n".getBytes()[0]; // 当前行数 private long lineNum = 0; // 文件编码,默认为gb2312 private String encode = "gb2312"; // 具体业务逻辑监听器 private ReaderFileListener readerListener;  public void setEncode(String encode) {  this.encode = encode; }  public void setReaderListener(ReaderFileListener readerListener) {  this.readerListener = readerListener; }  /**  * 获取准确开始位置  * @param file  * @param position  * @return  * @throws Exception  */ public long getStartNum(File file, long position) throws Exception {  long startNum = position;  FileChannel fcin = new RandomAccessFile(file, "r").getChannel();  fcin.position(position);  try {   int cache = 1024;   ByteBuffer rBuffer = ByteBuffer.allocate(cache);   // 每次读取的内容   byte[] bs = new byte[cache];   // 缓存   byte[] tempBs = new byte[0];   String line = "";   while (fcin.read(rBuffer) != -1) {    int rSize = rBuffer.position();    rBuffer.rewind();    rBuffer.get(bs);    rBuffer.clear();    byte[] newStrByte = bs;    // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面    if (null != tempBs) {     int tL = tempBs.length;     newStrByte = new byte[rSize + tL];     System.arraycopy(tempBs, 0, newStrByte, 0, tL);     System.arraycopy(bs, 0, newStrByte, tL, rSize);    }    // 获取开始位置之后的第一个换行符    int endIndex = indexOf(newStrByte, 0);    if (endIndex != -1) {     return startNum + endIndex;    }    tempBs = substring(newStrByte, 0, newStrByte.length);    startNum += 1024;   }  } catch (Exception e) {   e.printStackTrace();  } finally {   fcin.close();  }  return position; }  /**  * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾  * @param fullPath  * @param start  * @param end  * @throws Exception  */ public void readFileByLine(String fullPath, long start, long end) throws Exception {  File fin = new File(fullPath);  if (fin.exists()) {   FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();   fcin.position(start);   try {    ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);    // 每次读取的内容    byte[] bs = new byte[bufSize];    // 缓存    byte[] tempBs = new byte[0];    String line = "";    // 当前读取文件位置    long nowCur = start;    while (fcin.read(rBuffer) != -1) {     nowCur += bufSize;      int rSize = rBuffer.position();     rBuffer.rewind();     rBuffer.get(bs);     rBuffer.clear();     byte[] newStrByte = bs;     // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面     if (null != tempBs) {      int tL = tempBs.length;      newStrByte = new byte[rSize + tL];      System.arraycopy(tempBs, 0, newStrByte, 0, tL);      System.arraycopy(bs, 0, newStrByte, tL, rSize);     }     // 是否已经读到最后一位     boolean isEnd = false;     // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置     if (end > 0 && nowCur > end) {      // 缓存长度 - 当前已经读取位数 - 最后位数      int l = newStrByte.length - (int) (nowCur - end);      newStrByte = substring(newStrByte, 0, l);      isEnd = true;     }     int fromIndex = 0;     int endIndex = 0;     // 每次读一行内容,以 key(默认为\n) 作为结束符     while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {      byte[] bLine = substring(newStrByte, fromIndex, endIndex);      line = new String(bLine, 0, bLine.length, encode);      lineNum++;      // 输出一行内容,处理方式由调用方提供      readerListener.outLine(line.trim(), lineNum, false);      fromIndex = endIndex + 1;     }     // 将未读取完成的内容放到缓存中     tempBs = substring(newStrByte, fromIndex, newStrByte.length);     if (isEnd) {      break;     }    }    // 将剩下的最后内容作为一行,输出,并指明这是最后一行    String lineStr = new String(tempBs, 0, tempBs.length, encode);    readerListener.outLine(lineStr.trim(), lineNum, true);   } catch (Exception e) {    e.printStackTrace();   } finally {    fcin.close();   }   } else {   throw new FileNotFoundException("没有找到文件:" + fullPath);  }  // 通知观察者,当前工作已经完成  setChanged();  notifyObservers(start+"-"+end); }  /**  * 查找一个byte[]从指定位置之后的一个换行符位置  *  * @param src  * @param fromIndex  * @return  * @throws Exception  */ private int indexOf(byte[] src, int fromIndex) throws Exception {   for (int i = fromIndex; i < src.length; i++) {   if (src[i] == key) {    return i;   }  }  return -1; }  /**  * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]  *  * @param src  * @param fromIndex  * @param endIndex  * @return  * @throws Exception  */ private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {  int size = endIndex - fromIndex;  byte[] ret = new byte[size];  System.arraycopy(src, fromIndex, ret, 0, size);  return ret; } }

读文件线程

/** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午4:50 * To change this template use File | Settings | File Templates. */public class ReadFileThread extends Thread {  private ReaderFileListener processPoiDataListeners; private String filePath; private long start; private long end;  public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {  this.setName(this.getName()+"-ReadFileThread");  this.start = start;  this.end = end;  this.filePath = file;  this.processPoiDataListeners = processPoiDataListeners; }  @Override public void run() {  ReadFile readFile = new ReadFile();  readFile.setReaderListener(processPoiDataListeners);  readFile.setEncode(processPoiDataListeners.getEncode());//  readFile.addObserver();  try {   readFile.readFileByLine(filePath, start, end + 1);  } catch (Exception e) {   e.printStackTrace();  } }}

具体业务逻辑监听

/** * Created with Okey * User: Okey * Date: 13-3-14 * Time: 下午3:19 * NIO逐行读数据回调方法 */public abstract class ReaderFileListener {  // 一次读取行数,默认为500 private int readColNum = 500;  private String encode;  private List list = new ArrayList();  /**  * 设置一次读取行数  * @param readColNum  */ protected void setReadColNum(int readColNum) {  this.readColNum = readColNum; }  public String getEncode() {  return encode; }  public void setEncode(String encode) {  this.encode = encode; }  /**  * 每读取到一行数据,添加到缓存中  * @param lineStr 读取到的数据  * @param lineNum 行号  * @param over 是否读取完成  * @throws Exception  */ public void outLine(String lineStr, long lineNum, boolean over) throws Exception {  if(null != lineStr)   list.add(lineStr);  if (!over && (lineNum % readColNum == 0)) {   output(list);   list.clear();  } else if (over) {   output(list);   list.clear();  } }  /**  * 批量输出  *  * @param stringList  * @throws Exception  */ public abstract void output(List stringList) throws Exception; }

线程调度

import java.io.File;import java.io.FileInputStream;import java.io.IOException; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-1 * Time: 下午6:03 * To change this template use File | Settings | File Templates. */public class BuildData { public static void main(String[] args) throws Exception {  File file = new File("E:\\1396341974289.csv");  FileInputStream fis = null;  try {   ReadFile readFile = new ReadFile();   fis = new FileInputStream(file);   int available = fis.available();   int maxThreadNum = 50;   // 线程粗略开始位置   int i = available / maxThreadNum;   for (int j = 0; j < maxThreadNum; j++) {    // 计算精确开始位置    long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);    long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;    // 具体监听实现    ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");    new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();   }  } catch (IOException e) {   e.printStackTrace();  } catch (Exception e) {   e.printStackTrace();  } }}

现在就可以尽情的调整maxThreadNum来享受风一般的速度吧!

感谢你能够认真阅读完这篇文章,希望小编分享的“java如何使用多线程读取超大文件”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


名称栏目:java如何使用多线程读取超大文件
链接URL:http://6mz.cn/article/jsessi.html

其他资讯