当前位置: 代码迷 >> J2EE >> [讨论]在业务中使用发布/订阅模式,如何避免和事务之间的关系
  详细解决方案

[讨论]在业务中使用发布/订阅模式,如何避免和事务之间的关系

热度:581   发布时间:2016-04-21 20:30:49.0
[讨论]在业务中使用发布/订阅模式,如何处理和事务之间的关系?
假设业务场景如:用户发布博文,文章需要做索引供全文检索,记录操作日志以及计数功能(个人博客总数,某分类下博客数);其中如果计数服务出现异常事务应该回滚,索引和日志出异常应不影响博客保存。
@Transactional(propagation=Propagation.REQUIRED)
public Blog save(Blog blog) {

  //持久化,可能出现数据库异常,如主键约束,字段过大
  dao.save(blog);
  //发布 新增博客 事件,订阅者有 索引服务,日志服务,计数服务
  fireEvent(blogEvent);//可同步或异步处理事件

return blog;
}

问题:
 使用spring控制事务,当事务commit时,才可能抛出底层异常;但此时fireEvent方法已经执行完成了;结果是博文并未保存但触发了索引、日志服务,有什么好的方法处理这种情况吗?
我想到方法是:在之上在加一层代理,fireEvent移除至事务之外,
	public Blog proxySave(Blog blog){
//事务层save
save(blogEvent); //有影响事务 计数服务 放在里面
  
  //发布 新增博客 事件,订阅者有 索引服务,日志服务
fireEvent(blogEvent);
return blog;
}

这样做不好之处,多增加一层增加码量;fireEvent事件可以做成异步队列,是否可以借助filter 在chain.doFilter(request, response)后,判断当前线程事务是否提交,在决定fireEvent 中异步订阅者是否发布执行?不知各位有什么好的意见,类似这种业务如何处理?
------解决方案--------------------
我在公司的项目里是这样解决这个问题的,我们也用到了Spring的声明式事务,因为是AOP的方式所以只有在方法执行完成后才会去commit或者rollback,这里需要在如果rollback后完成一些善后动作.

我们的做法是这样.


public class OnCallDataSource implements DataSource {
  private DataSource dataSource;

    public OnCallDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
        DataSourceUtil.OnCallHouse.allowCall();
    }

    public Connection getConnection() throws SQLException {
        return new OnCallConnection(dataSource.getConnection());
    }

    //其他方法都是直接委托给原来的DataSource.
}


对.就是返回一个我们自己的Connection实现,这个实现包装了原有的正常Connection.


public class OnCallConnection implements Connection {

    private Connection conn;

    public OnCallConnection(Connection conn) throws SQLException {
        this.conn = conn;
        this.conn.setAutoCommit(false);
    }

    public void commit() throws SQLException {
        this.conn.commit();
        executeCalls(
                DataSourceUtil.OnCallHouse.getCalls(
                DataSourceUtil.OnCallHouse.COMMIT_CALLS));
    }

    public void rollback() throws SQLException {
        this.conn.rollback();
        executeCalls(
                DataSourceUtil.OnCallHouse.getCalls(
                DataSourceUtil.OnCallHouse.ROLLBACK_CALLS));
    }

    public void rollback(Savepoint savepoint) throws SQLException {
        this.conn.rollback(savepoint);
        executeCalls(DataSourceUtil.OnCallHouse.getCalls(
                DataSourceUtil.OnCallHouse.ROLLBACK_CALLS));
    }

    private void executeCalls(List<Runnable> calls) {
        if (calls.isEmpty()) {
            return;
        }

        Runnable call = null;
        int size = calls.size();
        for (int i = 0; i < size; i++) {
            call = calls.get(i);
            try {
                call.run();
            } catch (Exception ex) {
                LOG.errorLog(ex);
            }
        }
        DataSourceUtil.OnCallHouse.clear();
    }

//同OnCallDataSource
}


最后是那个临时储存回调方法的地方OnCallHouse其实就是利用ThreadLocal来储存回调任务列表的.
  相关解决方案