当前位置: 代码迷 >> java >> 控制异步网络消息传递设计速率的Communication Manager
  详细解决方案

控制异步网络消息传递设计速率的Communication Manager

热度:91   发布时间:2023-07-16 17:30:47.0

我有一个需要访问的系统,对该系统的访问率为每秒1个API。 但是,我希望通过异步的Web界面提供对它的访问。 我有一个设计,需要一个专用的通信管理器线程,该线程将消息收集到队列中,并一次发送一条消息,然后回叫给发送者并发送消息结果。

这是一个好方法吗? 您目前在代码中看到任何明显的陷阱吗?

public class CommunicationManager implements Runnable
{
    private BlockingQueue<Message> message = new LinkedList<> ();

    private boolean shutdown = false;

    private Messenger messenger = new Messenger();

    public CommunicationManager() {}

    public void run()
    {
        long elapsed, start, diff;
        start = 0;

        while (!shutdown)
        {
            elapsed = System.currentTimeMillis();
            diff = elapsed - start;
            if (diff < 1000)
            {
                Thread.sleep(1000 - diff);
            }
            if (!message.isEmpty())
            {
                Message next = message.remove();
                next.getSender().recieve(messenger.send(next.getMessage()));
            }
            start = elapsed;
        }           
    }

    public synchronized void addMessage(Sender sender, String message)
    {
        this.message.add(new Message(sender, message));
    }

    public synchronized void shutdown()
    {
        this.shutdown = true;
    }
}

该管理器的希望结果是,每个循环(如果自上一个循环开始以来未经过一秒钟),它将在剩余时间内休眠。 然后它将检查队列是否为空。 如果队列不为空,它将检索队列中的下一条消息,发送该消息并将结果返回给发件人的回调。 然后循环结束,并再次开始循环。

我使用BlockingQueue来避免在删除队列中的最后一条消息时有人将消息添加到队列中的问题。 我认为默认的Queue结构不是线程安全的,因此我需要一些方法来防止这种情况的发生。

事实证明,我在这里重新发明了轮子。 我真正想做的事情可以使用Java ScheduledExecutorService轻松完成。

class CommunicationControl
{
   private final ScheduledExecutorService scheduluer = Executors.newScheduledThreadPool(1);

   public void startManager()
   {
      final CommunicationManager manager = new CommunicationManager();
      scheduler.scheduleAtFixedRate(manager, 10, 1, SECONDS);
   }

   public void stopManager()
   {
      while (manager.getMessageCount() > 0)
      {
          try
          {
              Thread.sleep(manager.getMessageCount() * 1000);
          } catch (InterruptedException e)
          {
              e.printStackTrace();
          }
      }
      scheduler.shutdown();
   }
}

原始班级的改编在这里:

class CommunicationManager implements Runnable
{
   private BlockingQueue<Message> message = new ArrayBlockingQueue<Message> (1000);
   private Messenger messenger = new Messenger();

   public CommunicationManager() {}

   public void run()
   {
      Message next = message.poll();
      if (next != null)
      {
         next.getSender().recieve(messenger.send(next.getMessage()));
      }
   }

   public void addMessage(Sender sender, String message)
   {
       try
       {
          while (!this.message.offer(new Message(sender, message), 1, TimeUnit.SECONDS)) {}
       } catch (InterruptedException e)
       {
          e.printStackTrace();
       }
   }
}

首先要做的是创建一个执行程序来处理Communication Manager的运行频率,在这种情况下,它是在延迟10秒后每秒执行一次。 然后,一旦每秒发生一次,CommuncationManager的Run方法将执行恰好执行一个工作单元(如果可用)的操作。

只要有队列空间,其他方法就可以随意向CommunicationManager添加工作,否则,它们将阻塞直到空间可用。

这也使我能够将其构建到诸如Spring之类的框架中。

  相关解决方案