一、MemoryChannel属性:
1.基本属性
//定义队列中一次允许的事件总数:
private static final Integer defaultCapacity = 100;
//定义一个事务中允许的事件总数:
private static final Integer defaultTransCapacity = 100;
//将物理内存转换成槽(slot)数,默认是100,看后边代码就知道是啥意思了:
private static final double byteCapacitySlotSize = 100;
//定义队列中事件所使用空间的最大字节数(默认是JVM最大可用内存的0.8):
private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);
//定义byteCapacity和预估Event大小之间的缓冲区百分比:
private static final Integer defaultByteCapacityBufferPercentage = 20;
//添加或者删除一个event的超时时间,单位秒:
private static final Integer defaultKeepAlive = 3;
2.MemoryChannel有一个内部类MemoryTransaction,也是整个事务保证最重要的类,后面第三部分专门讨论。
3.创建一个Object当做队列锁,操作队列的时候保证数据的一致性
// lock to guard queue, mainly needed to keep it locked down during resizes// it should never be held through a blocking operationprivate Object queueLock = new Object();
4.使用LinkedBlockingDeque queue维持一个队列,队列的两端分别是source和sink。
用queueStored来保存queue中当前的保存的event的数目,后面tryAcquire方法可以判断是否可以take到一个event,用queueRemaining来保存queue中当前可用的容量,可以用来判断当前是否有可以提交一定数量的event到queue中。
private LinkedBlockingDeque<Event> queue;
private Semaphore queueRemaining;
private Semaphore queueStored;
5.有一个属性是private ChannelCounter channelCounter;,就是把channel的一些属性封装了一下
public class ChannelCounter extends MonitoredCounterGroup implementsChannelCounterMBean {private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt";private static final String COUNTER_EVENT_TAKE_ATTEMPT ="channel.event.take.attempt";private static final String COUNTER_EVENT_PUT_SUCCESS ="channel.event.put.success";private static final String COUNTER_EVENT_TAKE_SUCCESS ="channel.event.take.success";private static final String COUNTER_CHANNEL_CAPACITY ="channel.capacity";
6.提供了估计Event大小(即所占的字节数)的方法
private long estimateEventSize(Event event) {byte[] body = event.getBody();if (body != null && body.length != 0) {return body.length;}
//当Event为空的时候,估计占一个字节//Each event occupies at least 1 slot, so return 1.return 1;}
二、MemeoryChannel变量初始化
1.capacity,transCapacity,byteCapacityBufferPercentage,keepAlive都是用默认的或者传入的参数,注意byteCapacity,这里将其抽象化为slot的概念,比如你的-Xmx参数设置的是2048m,其他参数都选择默认,则byteCapacity=((2048*0.8)*1024*1024*0.8)/100,结果取整数
byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize);
//如果参数设置不合理造成byteCapacity小于1,则将byteCapacity设置为Integer.MAX_VALUEif (byteCapacity < 1) {byteCapacity = Integer.MAX_VALUE;
}
2.创建队列及其相关信号量
//根据capacity创建queue
queue = new LinkedBlockingDeque<Event>(capacity);
//初始queue没有Event,所以queueRemaining的值最大,为capacity
queueRemaining = new Semaphore(capacity);
//初始queue没有Event,所以queueStored的值最小,为0
queueStored = new Semaphore(0);
//队列内还可以容纳的字节数,也是最初始状态byteCapacity
bytesRemaining = new Semaphore(byteCapacity);
lastByteCapacity = byteCapacity;
//根据Channel名称创建channelCounter对象
channelCounter = new ChannelCounter(getName());
三、MemoryTransaction类
1.基本属性
//take事务用到的队列:
private LinkedBlockingDeque<Event> takeList;
//put事务用到的队列:
private LinkedBlockingDeque<Event> putList;
//channel属性:
private final ChannelCounter channelCounter;
//put字节数计数器
private int putByteCounter = 0;
//take字节计数器
private int takeByteCounter = 0;
2.构造器
public MemoryTransaction(int transCapacity, ChannelCounter counter) {putList = new LinkedBlockingDeque<Event>(transCapacity);takeList = new LinkedBlockingDeque<Event>(transCapacity);channelCounter = counter;
}
3.核心方法解读
- 方法一 doPut()
@Override
protected void doPut(Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount();
//计算event大概占用的slot数int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
//offer方法往putList中添加event
//LinkedBlockingDeque有三种添加元素的方法,add,put和offer,注意区别if (!putList.offer(event)) {throw new ChannelException("Put queue for MemoryTransaction of capacity " +putList.size() + " full, consider committing more frequently, " +"increasing capacity or increasing thread count");}
//累加这一条event所占用的slot空间putByteCounter += eventByteSize;
}
- 方法二 doTake()
@Override
protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();//判断takeList中是否还有空间if (takeList.remainingCapacity() == 0) {//如果没有空间则抛出异常throw new ChannelException("Take list for MemoryTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count");}//判断当前MemoryChannel中的queue中是否还有空间,这里通过信号量来判断if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;synchronized (queueLock) {//从queue头部弹出一条消息event = queue.poll();}Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +"signalling existence of entry");//放入takeList中takeList.put(event);//估算这条Event所占空间(slot数)int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);//累加takeList中的字节数takeByteCounter += eventByteSize;//将取出来的这条Event返回return event;
}
- 方法三doCommit()
@Override
protected void doCommit() throws InterruptedException {
//takeList中Event数与putList中的Event差值int remainingChange = takeList.size() - putList.size();
//如果takeList小,说明向该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放if (remainingChange < 0) {// 1. 首先通过信号量来判断是否还有剩余空间
//tips:这一步tryAcquire方法会将bytesRemaining的值减去putByteCounter的值
//tryAcquire:如果bytesRemaining原来的值大于putByteCounter则返回trueif (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {throw new ChannelException("Cannot commit transaction. Byte capacity " +"allocated to store event body " + byteCapacity * byteCapacitySlotSize +"reached. Please increase heap space/byte capacity allocated to " +"the channel as the sinks may not be keeping up with the sources");}// 2. 然后判断,在给定的keepAlive时间内,能否获取到充足的queue空间if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("Space for commit to queue couldn't be acquired." +" Sinks are likely not keeping up with sources, or the buffer size is too tight");}}int puts = putList.size();int takes = takeList.size();
//如果上面的两个判断都过了,那么把putList中的Event放到该MemoryChannel中的queue中。synchronized (queueLock) {if (puts > 0) {//将putList中的Event循环放入queue中while (!putList.isEmpty()) {if (!queue.offer(putList.removeFirst())) {throw new RuntimeException("Queue add failed, this shouldn't be able to happen");}}}
//上面的工作完成后,清空putList和takeList,一次事务完成putList.clear();takeList.clear();}
//释放空间,也就是把queue中的剩余空间加上已经已经取走(take)的空间bytesRemaining.release(takeByteCounter);
//一加一减,更新了bytesRemaining的值,然后将两个计数器置零takeByteCounter = 0;putByteCounter = 0;
//将queueStored的值加上puts的值,更新信号量queueStored.release(puts);//如果takeList比putList大,说明该MemoryChannel中queue的数量应该是减少了,所以把(takeList-putList)的差值加到信号量queueRemainingif (remainingChange > 0) {queueRemaining.release(remainingChange);}
//以下是更新channelCounter中的三个变量if (puts > 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes > 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size());
}
- 方法四doRollback()
//当一个事务失败时,会进行回滚,即调用本方法
@Override
protected void doRollback() {
//得到takeList中的Event数量int takes = takeList.size();
//首先把takeList中的Event放回到MemoryChannel中的queue中synchronized (queueLock) {
//先判断queue中能否有足够的空间将takeList的Events放回去Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),"Not enough space in memory channel " +"queue to rollback takes. This should never happen, please report");
//从takeList的尾部依次取出Event,放入queue的头部while (!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}
//然后清空putListputList.clear();}putByteCounter = 0;takeByteCounter = 0;
//因为清空了putList,所以需要把putList所占用的空间大小添加到bytesRemaining中queueStored.release(takes);channelCounter.setChannelSize(queue.size());
}
四、一个完整的put/take事务流程
- put事务
在org.apache.flume.channel.ChannelProcessor.java中
for (Channel reqChannel : requiredChannels) {Transaction tx = reqChannel.getTransaction();Preconditions.checkNotNull(tx, "Transaction object must not be null");try {tx.begin();//底层就是调用的doPut方法reqChannel.put(event);tx.commit();} catch (Throwable t) {tx.rollback();if (t instanceof Error) {LOG.error("Error while writing to required channel: " + reqChannel, t);throw (Error) t;} else if (t instanceof ChannelException) {throw (ChannelException) t;} else {throw new ChannelException("Unable to put event on required " +"channel: " + reqChannel, t);}} finally {if (tx != null) {tx.close();}}
}
- take事务
以kafkaSink为例,在org.apache.flume.sink.kafka.KafkaSink.java中
transaction = channel.getTransaction();
transaction.begin();kafkaFutures.clear();
long batchStartTime = System.nanoTime();
for (; processedEvents < batchSize; processedEvents += 1) {event = channel.take();if (event == null) {// no events available in channelif (processedEvents == 0) {result = Status.BACKOFF;counter.incrementBatchEmptyCount();} else {counter.incrementBatchUnderflowCount();}break;}counter.incrementEventDrainAttemptCount();byte[] eventBody = event.getBody();Map<String, String> headers = event.getHeaders();if (allowTopicOverride) {eventTopic = headers.get(topicHeader);if (eventTopic == null) {eventTopic = BucketPath.escapeString(topic, event.getHeaders());logger.debug("{} was set to true but header {} was null. Producing to {}" + " topic instead.",new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, topicHeader, eventTopic});}} else {eventTopic = topic;}eventKey = headers.get(KEY_HEADER);if (logger.isTraceEnabled()) {if (LogPrivacyUtil.allowLogRawData()) {logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "+ new String(eventBody, "UTF-8"));} else {logger.trace("{Event} " + eventTopic + " : " + eventKey);}}logger.debug("event #{}", processedEvents);// create a message and add to bufferlong startTime = System.currentTimeMillis();Integer partitionId = null;try {ProducerRecord<String, byte[]> record;if (staticPartitionId != null) {partitionId = staticPartitionId;}//Allow a specified header to override a static IDif (partitionHeader != null) {String headerVal = event.getHeaders().get(partitionHeader);if (headerVal != null) {partitionId = Integer.parseInt(headerVal);}}if (partitionId != null) {record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,serializeEvent(event, useAvroEventFormat));} else {record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,serializeEvent(event, useAvroEventFormat));}kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));} catch (NumberFormatException ex) {throw new EventDeliveryException("Non integer partition id specified", ex);} catch (Exception ex) {// N.B. The producer.send() method throws all sorts of RuntimeExceptions// Catching Exception here to wrap them neatly in an EventDeliveryException// which is what our consumers will expectthrow new EventDeliveryException("Could not send event", ex);}
}//Prevent linger.ms from holding the batch
producer.flush();// publish batch and commit.
if (processedEvents > 0) {for (Future<RecordMetadata> future : kafkaFutures) {future.get();}long endTime = System.nanoTime();counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size()));
}transaction.commit();
总结
MemoryChannel的逻辑相对简单,主要是通过MemoryTransaction中的putList、takeList与MemoryChannel中的queue打交道,这里的queue相当于持久化层,只不过放到了内存中,如果是FileChannel的话,会把这个queue放到本地文件中。下面表示了Event在一个使用了MemoryChannel的agent中数据流向:
source ---> putList ---> queue ---> takeList ---> sink