当前位置: 代码迷 >> J2SE >> 关于多线程的数据分发有关问题
  详细解决方案

关于多线程的数据分发有关问题

热度:56   发布时间:2016-04-23 20:42:38.0
关于多线程的数据分发问题
最近写一个多线程的,处理文件夹中文xml文件,我照着别人写的一个多线程的例子仿写的。下面是获取xml数据的代码。
我想的是把获取的xml路径放入stack中,每个线程都从这个stack中获取数据,只到栈空。
有几个问题:
1.在栈中取数据的代码是互锁的,不能同时取数据,这样的情况,我开5个线程,比起一个线程会不会提高数据的处理速度?
2.我想把读取的数据分成5个栈(或者其他的如hashtable),给每个线程一个单独的栈,这样就不会有锁的问题了,可是这样数据该怎么传给线程呢?
3.如果数据量太大,栈溢出怎么办呢
下面首先是我的线程的run方法
private traverFolder traverfolder;
public void run() {

try
{
synchronized (traverfolder.getfileStack()) {
//选取所有的可以执行的脚本,并放入待抓取队列
doWork();
}
LOG.info("sleep "  + " seconds");
Thread.sleep(1);
}
catch(Exception e)
{

}
}

private void doWork() {
//待提取的xml文件栈
Stack<String> fileStack = null;

//不允许各提取线程同时从待提取栈中取文件
synchronized (traverFolder.getInstance().getfileStack()) {
//从待提取栈中取出第一个xml
if ((fileStack = traverFolder.getInstance()
.getfileStack()) != null) {
while(!fileStack.empty())
{
String dir = fileStack.pop();
extractXML(dir);
}

} else {
// LOG.info("executableScriptQueue is empty");
try {
//阻塞抓取线程,等待可用的抓取脚本
//由脚本调度线程唤醒
LOG.info(threadID + " wait for next fileStack file");
traverFolder.getInstance().getfileStack()
.wait();
} catch (Exception e) {
LOG.error("wait for notify by fileStack error",
e);
}
}
}

然后是获取数据的代码
class traverFolder {
private static final Log LOG = LogFactory.getLog("traverFolder");
private static Stack<String> fileStack; 

//单例
private static traverFolder traverfolder;

private traverFolder() {//构造函数
//初始化xml文件栈、待提取栈为空
fileStack = new Stack<String>();
//初始化文件栈
traverDirs();
}
/**
 * 单例
 * @return
 */
public static traverFolder getInstance() {
if (traverfolder == null)
traverfolder = new traverFolder();
return traverfolder;
}
public synchronized Stack<String> getfileStack() {
return fileStack;
}
/**
 * 遍历文件夹方法
 * @param strPath
 */
public static void refreshFileList(String strPath) {
File dir = new File(strPath);
    File[] files = dir.listFiles();
                 
    if (files == null)
        return;
    for (int i = 0; i < files.length; i++) {
        if (files[i].isDirectory()) {
            refreshFileList(files[i].getAbsolutePath());
        } else {
            String strFileName = files[i].getAbsolutePath().toLowerCase();
            System.out.println("---"+strFileName);
                   // filelist.add(files[i].getAbsolutePath());
            fileStack.push(files[i].getAbsolutePath());
                }
            }
        }
    
/**
 * 遍历/datasource/文件夹及其子文件夹,只要存在xml文件就进行解析,并把解析完的xml文件转移到备份文件夹/datasource_backup/,
 * 注意:
 * 
 * @param 
 * @return 
 */
public void traverDirs()
{
System.out.println("Start!");
String tableName = fieldDicts.appItemTableNameInHbase;
HbaseInterface.createTable(tableName,"INFO",-1);

String dir_source = "/data/search_datasource_backup/";//原始文件目录

String typePaths[] = {"1/2014","2/2014","5/2014","10/2014","11/2014"}; 

for(int i=0;i<typePaths.length;i++)
{
try{
//遍历子目录
String dirpath = dir_source+typePaths[i];
refreshFileList(dirpath);
}catch (Exception e) {
LOG.error("Fail to deal with folder: "+dir_source+typePaths[i]);
LOG.error("[OVERALL ERROR] ", e);
}
}
}
}

------解决方案--------------------
简单写了一个例子,代码没进行测试。这BIO方式我是不太赞成的。
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
  相关解决方案