Flume提供了可靠地日志采集功能,其高可靠是通过事务机制实现的。而对于Channel的事务我们本部分会介绍MemoryChannel和FileChannel的实现。

创新互联公司专注为客户提供全方位的互联网综合服务,包含不限于成都网站设计、成都网站制作、梧州网络推广、小程序定制开发、梧州网络营销、梧州企业策划、梧州品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联公司为所有大学生创业者提供梧州建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com
首先我们看下BasicChannelSemantics实现:
Java代码
- public abstract class BasicChannelSemantics extends AbstractChannel {
 - //1、事务使用ThreadLocal存储,保证事务线程安全
 - private ThreadLocal
 currentTransaction - = new ThreadLocal
 (); - private boolean initialized = false;
 - //2、进行一些初始化工作
 - protected void initialize() {}
 - //3、提供给实现类的创建事务的回调
 - protected abstract BasicTransactionSemantics createTransaction();
 - //4、往Channel放Event,其直接委托给事务的put方法实现
 - @Override
 - public void put(Event event) throws ChannelException {
 - BasicTransactionSemantics transaction = currentTransaction.get();
 - Preconditions.checkState(transaction != null,
 - "No transaction exists for this thread");
 - transaction.put(event);
 - }
 - //5、从Channel获取Event,也是直接委托给事务的take方法实现
 - @Override
 - public Event take() throws ChannelException {
 - BasicTransactionSemantics transaction = currentTransaction.get();
 - Preconditions.checkState(transaction != null,
 - "No transaction exists for this thread");
 - return transaction.take();
 - }
 - //6、获取事务,如果本实例没有初始化则先初始化;否则先从ThreadLocal获取事务,如果没有或者关闭了则创建一个并绑定到ThreadLocal。
 - @Override
 - public Transaction getTransaction() {
 - if (!initialized) {
 - synchronized (this) {
 - if (!initialized) {
 - initialize();
 - initialized = true;
 - }
 - }
 - }
 - BasicTransactionSemantics transaction = currentTransaction.get();
 - if (transaction == null || transaction.getState().equals(
 - BasicTransactionSemantics.State.CLOSED)) {
 - transaction = createTransaction();
 - currentTransaction.set(transaction);
 - }
 - return transaction;
 - }
 - }
 
MemoryChannel事务实现
首先我们来看下MemoryChannel的实现,其是一个纯内存的Channel实现,整个事务操作都是在内存中完成。首先看下其内存结构:
1、首先由一个Channel Queue用于存储整个Channel的Event数据;
2、每个事务都有一个Take Queue和Put Queue分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。
MemoryChannel时设计时考虑了两个容量:Channel Queue容量和事务容量,而这两个容量涉及到了数量容量和字节数容量。
另外因为多个事务要操作Channel Queue,还要考虑Channel Queue的动态扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。
在configure方法中进行了一些参数的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何计算的:
Java代码
- try {
 - capacity = context.getInteger("capacity", defaultCapacity);
 - } catch(NumberFormatException e) {
 - capacity = defaultCapacity;
 - }
 - if (capacity <= 0) {
 - capacity = defaultCapacity;
 - }
 
即首先从配置文件读取数量容量,如果没有配置则是默认容量(默认100),而配置的容量小于等于0,则也是默认容量。
接下来是初始化事务数量容量:
Java代码
- try {
 - transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
 - } catch(NumberFormatException e) {
 - transCapacity = defaultTransCapacity;
 - }
 - if (transCapacity <= 0) {
 - transCapacity = defaultTransCapacity;
 - }
 - Preconditions.checkState(transCapacity <= capacity,
 - "Transaction Capacity of Memory Channel cannot be higher than " +
 - "the capacity.");
 
整个过程和Channel Queue数量容量初始化类似,但是***做了前置条件判断,事务容量必须小于等于Channel Queue容量。
接下来是字节容量限制:
Java代码
- try {
 - byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);
 - } catch(NumberFormatException e) {
 - byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
 - }
 - try {
 - byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
 - if (byteCapacity < 1) {
 - byteCapacity = Integer.MAX_VALUE;
 - }
 - } catch(NumberFormatException e) {
 - byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
 - }
 
byteCapacityBufferPercentage:用来确定byteCapacity的一个百分比参数,即我们定义的字节容量和实际事件容量的百分比,因为我们定义的字节容量主要考虑Event body,而忽略Event header,因此需要减去Event header部分的内存占用,可以认为该参数定义了Event header占了实际字节容量的百分比,默认20%;
byteCapacity:首先读取配置文件定义的byteCapacity,如果没有定义,则使用默认defaultByteCapacity,而defaultByteCapacity默认是JVM物理内存的80%(Runtime.getRuntime().maxMemory() * .80);那么实际byteCapacity=定义的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默认100,即计算百分比的一个系数。
接下来定义keepAlive参数:
Java代码
- try {
 - keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
 - } catch(NumberFormatException e) {
 - keepAlive = defaultKeepAlive;
 - }
 
keepAlive定义了操作Channel Queue的等待超时事件,默认3s。
接着初始化Channel Queue:
Java代码
- if(queue != null) {
 - try {
 - resizeQueue(capacity);
 - } catch (InterruptedException e) {
 - Thread.currentThread().interrupt();
 - }
 - } else {
 - synchronized(queueLock) {
 - queue = new LinkedBlockingDeque
 (capacity); - queueRemaining = new Semaphore(capacity);
 - queueStored = new Semaphore(0);
 - }
 - }
 
首先如果Channel Queue不为null,表示动态扩容;否则进行Channel Queue的创建。
首先看下***创建Channel Queue,首先使用queueLock锁定,即在操作Channel Queue时都需要锁定,因为之前说过Channel Queue可能动态扩容,然后初始化信号量:Channel Queue剩余容量和向Channel Queue申请存储的容量,用于事务操作中预占Channel Queue容量。
接着是调用resizeQueue动态扩容:
Java代码
- private void resizeQueue(int capacity) throws InterruptedException {
 - int oldCapacity;
 - synchronized(queueLock) { //首先计算扩容前的Channel Queue的容量
 - oldCapacity = queue.size() + queue.remainingCapacity();
 - }
 - if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要扩容
 - return;
 - } else if (oldCapacity > capacity) {//如果老容量大于新容量,缩容
 - //首先要预占老容量-新容量的大小,以便缩容容量
 - if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
 - //如果获取失败,默认是记录日志然后忽略
 - } else {
 - //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,因为这一系列操作要线程安全
 - synchronized(queueLock) {
 - LinkedBlockingDeque
 newQueue = new LinkedBlockingDeque (capacity); - newQueue.addAll(queue);
 - queue = newQueue;
 - }
 - }
 - } else {
 - //如果不是缩容,则直接扩容即可
 - synchronized(queueLock) {
 - LinkedBlockingDeque
 newQueue = new LinkedBlockingDeque (capacity); - newQueue.addAll(queue);
 - queue = newQueue;
 - }
 - //增加/减少Channel Queue的新的容量
 - queueRemaining.release(capacity - oldCapacity);
 - }
 - }
 - 到此,整个Channel Queue相关的数据初始化完毕,接着会调用start方法进行初始化:
 - public synchronized void start() {
 - channelCounter.start();
 - channelCounter.setChannelSize(queue.size());
 - channelCounter.setChannelCapacity(Long.valueOf(
 - queue.size() + queue.remainingCapacity()));
 - super.start();
 - }
 
此处初始化了一个ChannelCounter,是一个计数器,记录如当前队列放入Event数、取出Event数、成功数等。
之前已经分析了大部分Channel会把put和take直接委托给事务去完成,因此接下来看下MemoryTransaction的实现。
首先看下MemoryTransaction的初始化:
Java代码
- private class MemoryTransaction extends BasicTransactionSemantics {
 - private LinkedBlockingDeque
 takeList; - private LinkedBlockingDeque
 putList; - private final ChannelCounter channelCounter;
 - private int putByteCounter = 0;
 - private int takeByteCounter = 0;
 - public MemoryTransaction(int transCapacity, ChannelCounter counter) {
 - putList = new LinkedBlockingDeque
 (transCapacity); - takeList = new LinkedBlockingDeque
 (transCapacity); - channelCounter = counter;
 - }
 
可以看出MemoryTransaction涉及到两个事务容量大小定义的队列(链表阻塞队列)、队列字节计数器、另外一个是Channel操作的计数器。
事务中的放入操作如下:
Java代码
- protected void doPut(Event event) throws InterruptedException {
 - //1、增加放入事件计数器
 - channelCounter.incrementEventPutAttemptCount();
 - //2、estimateEventSize计算当前Event body大小
 - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
 - //3、往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
 - if (!putList.offer(event)) {
 - throw new ChannelException(
 - "Put queue for MemoryTransaction of capacity " +
 - putList.size() + " full, consider committing more frequently, " +
 - "increasing capacity or increasing thread count");
 - }
 - //4、增加放入队列字节数计数器
 - putByteCounter += eventByteSize;
 - }
 
整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制。
接下来是事务中的取出操作:
Java代码
- protected Event doTake() throws InterruptedException {
 - //1、增加取出事件计数器
 - channelCounter.incrementEventTakeAttemptCount();
 - //2、如果takeList队列没有剩余容量,即当前事务已经消费了***容量的Event
 - if(takeList.remainingCapacity() == 0) {
 - throw new ChannelException("Take list for MemoryTransaction, capacity " +
 - takeList.size() + " full, consider committing more frequently, " +
 - "increasing capacity, or increasing thread count");
 - }
 - //3、queueStored试图获取一个信号量,超时直接返回null
 - if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
 - return null;
 - }
 - //4、从Channel Queue获取一个Event
 - Event event;
 - synchronized(queueLock) {//对Channel Queue的操作必须加queueLock,因为之前说的动态扩容问题
 - event = queue.poll();
 - }
 - //5、因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
 - Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
 - "signalling existence of entry");
 - //6、暂存到事务的takeList队列
 - takeList.put(event);
 - //7、计算当前Event body大小并增加取出队列字节数计数器
 - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
 - takeByteCounter += eventByteSize;
 - return event;
 - }
 
接下来是提交事务:
Java代码
- protected void doCommit() throws InterruptedException {
 - //1、计算改变的Event数量,即取出数量-放入数量;如果放入的多,那么改变的Event数量将是负数
 - int remainingChange = takeList.size() - putList.size();
 - //2、 如果remainingChange小于0,则需要获取Channel Queue剩余容量的信号量
 - if(remainingChange < 0) {
 - //2.1、首先获取putByteCounter个字节容量信号量,如果失败说明超过字节容量限制了,回滚事务
 - if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
 - throw new ChannelException("Cannot commit transaction. Byte capacity " +
 - "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
 - "reached. Please increase heap space/byte capacity allocated to " +
 - "the channel as the sinks may not be keeping up with the sources");
 - }
 - //2.2、获取Channel Queue的-remainingChange个信号量用于放入-remainingChange个Event,如果获取不到,则释放putByteCounter个字节容量信号量,并抛出异常回滚事务
 - if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
 - bytesRemaining.release(putByteCounter);
 - throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
 - " Sinks are likely not keeping up with sources, or the buffer size is too tight");
 - }
 - }
 - int puts = putList.size();
 - int takes = takeList.size();
 - synchronized(queueLock) {//操作Channel Queue时一定要锁定queueLock
 - if(puts > 0 ) {
 - while(!putList.isEmpty()) { //3.1、如果有Event,则循环放入Channel Queue
 - if(!queue.offer(putList.removeFirst())) {
 - //3.2、如果放入Channel Queue失败了,说明信号量控制出问题了,这种情况不应该发生
 - throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
 - }
 - }
 - }
 - //4、操作成功后,清空putList和takeList队列
 - putList.clear();
 - takeList.clear();
 - }
 - //5.1、释放takeByteCounter个字节容量信号量
 - bytesRemaining.release(takeByteCounter);
 - //5.2、重置字节计数器
 - takeByteCounter = 0;
 - putByteCounter = 0;
 - //5.3、释放puts个queueStored信号量,这样doTake方法就可以获取数据了
 - queueStored.release(puts);
 - //5.4、释放remainingChange个queueRemaining信号量
 - if(remainingChange > 0) {
 - queueRemaining.release(remainingChange);
 - }
 - //6、ChannelCounter一些数据计数
 - if (puts > 0) {
 - channelCounter.addToEventPutSuccessCount(puts);
 - }
 - if (takes > 0) {
 - channelCounter.addToEventTakeSuccessCount(takes);
 - }
 - channelCounter.setChannelSize(queue.size());
 - }
 
此处涉及到两个信号量:
queueStored表示Channel Queue已存储事件容量(已存储的事件数量),队列取出事件时-1,放入事件成功时+N,取出失败时-N,即Channel Queue存储了多少事件。queueStored信号量默认为0。当doTake取出Event时减少一个queueStored信号量,当doCommit提交事务时需要增加putList 队列大小的queueStored信号量,当doRollback回滚事务时需要减少takeList队列大小的queueStored信号量。
queueRemaining表示Channel Queue可存储事件容量(可存储的事件数量),取出事件成功时+N,放入事件成功时-N。queueRemaining信号量默认为Channel Queue容量。其在提交事务时首先通过remainingChange = takeList.size() - putList.size()计算获得需要增加多少变更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange个事件放入,此时应该减少-queueRemaining信号量;而如果大于0,则表示取出的事件比放入的多,表示有queueRemaining个事件取出,此时应该增加queueRemaining信号量;即消费事件时减少信号量,生产事件时增加信号量。
而bytesRemaining是字节容量信号量,超出容量则回滚事务。
***看下回滚事务:
Java代码
- protected void doRollback() {
 - int takes = takeList.size();
 - synchronized(queueLock) { //操作Channel Queue时一定锁住queueLock
 - //1、前置条件判断,检查是否有足够容量回滚事务
 - Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
 - "queue to rollback takes. This should never happen, please report");
 - //2、回滚事务的takeList队列到Channel Queue
 - while(!takeList.isEmpty()) {
 - queue.addFirst(takeList.removeLast());
 - }
 - putList.clear();
 - }
 - //3、释放putByteCounter个bytesRemaining信号量
 - bytesRemaining.release(putByteCounter);
 - //4、计数器重置
 - putByteCounter = 0;
 - takeByteCounter = 0;
 - //5、释放takeList队列大小个已存储事件容量
 - queueStored.release(takes);
 - channelCounter.setChannelSize(queue.size());
 - }
 - }
 
也就是说在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。
【本文是专栏作者张开涛的原创文章,作者微信公众号:开涛的博客,id:kaitao-1234567】
                网站题目:Flume架构与源码分析-MemoryChannel事务实现
                
                文章分享:http://www.csdahua.cn/qtweb/news42/208342.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网