博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka源码分析之Sender
阅读量:6189 次
发布时间:2019-06-21

本文共 3712 字,大约阅读时间需要 12 分钟。

        Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。

        首先,我们先看下它的成员变量:

/* the state of each nodes connection */    // 每个节点连接的状态KafkaClient实例client    private final KafkaClient client;    /* the record accumulator that batches records */    // 批量记录的记录累加器RecordAccumulator实例accumulator    private final RecordAccumulator accumulator;    /* the metadata for the client */    // 客户端元数据Metadata实例metadata    private final Metadata metadata;    /* the maximum request size to attempt to send to the server */    // 试图发送到server端的最大请求大小maxRequestSize    private final int maxRequestSize;    /* the number of acknowledgements to request from the server */    // 从server端获得的请求发送的已确认数量acks    private final short acks;    /* the number of times to retry a failed request before giving up */    // 一个失败请求在被放弃之前的重试次数retries    private final int retries;    /* the clock instance used for getting the time */    // 获取时间的时钟Time实例time    private final Time time;    /* true while the sender thread is still running */    // Sender线程运行的标志位,为true表示Sender线程一直在运行    private volatile boolean running;    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */    // 强制关闭的标志位forceClose    private volatile boolean forceClose;    /* metrics */    // 度量指标    private final SenderMetrics sensors;    /* param clientId of the client */    // 客户端的clientId    private String clientId;    /* the max time to wait for the server to respond to the request*/    // 等到server端响应请求的超时时间requestTimeout    private final int requestTimeout;
        既然是一个线程,我们看下它的主要运行逻辑run()方法,代码如下:

/**     * The main run loop for the sender thread     * sender线程的主循环     */    public void run() {        log.debug("Starting Kafka producer I/O thread.");        // main loop, runs until close is called        // 主循环,一直运行直到close被调用        while (running) {// 标志位running为true,则一直循环            try {            	// 调用待参数的run()方法                run(time.milliseconds());            } catch (Exception e) {            	            	// 截获异常后记录err级别log信息,输出异常                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");        // okay we stopped accepting requests but there may still be        // requests in the accumulator or waiting for acknowledgment,        // wait until these are completed.        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {            try {            	// 调用调用待参数的run()方法继续处理                run(time.milliseconds());            } catch (Exception e) {                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }                // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求        if (forceClose) {            // We need to fail all the incomplete batches and wake up the threads waiting on            // the futures.            this.accumulator.abortIncompleteBatches();        }                // 关闭客户端        try {            this.client.close();        } catch (Exception e) {            log.error("Failed to close network client", e);        }        log.debug("Shutdown of Kafka producer I/O thread has completed.");    }
        Sender线程的主循环在run()方法内,其主要处理逻辑为:

        1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:

              调用带参数的run(long now)方法,处理消息的发送;

        2、当close被调用时,running被设置为false,while主循环退出:

              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;

              2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;

              2.3、关闭客户端。

转载地址:http://ztlda.baihongyu.com/

你可能感兴趣的文章
http的应用-编译安装Apache
查看>>
XP上无法应用Win2008的AD组策略解决办法
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
***链路中mtu导致的问题
查看>>
nginx基本配置与参数说明
查看>>
BGP加密穿越ASA分析
查看>>
【深入浅出Node.js系列十三】用Nodejs连接MySQL
查看>>
CentOS上安装LAMP
查看>>
PHP 5.6.15 编译安装
查看>>
Akka学习笔记:Actor消息传递(2)
查看>>
Logstash--grok 正则表达式
查看>>
java 强制关闭win7进程
查看>>
8.10 shell特殊符号cut命令 8.11 sort_wc_uniq命令 8.12 tee_tr_split命令 8.13 shell特殊符号下...
查看>>
Microsoft Dynamics CRM Developer toolkit for visual studio 安装及下载地址
查看>>
CFDA发布GMP《计算机化系统和确认与验证》两个附录
查看>>
mysql远程备份并解决编码问题脚本
查看>>
ajaxterm 网页上的SSH
查看>>
jQuery.extend 函数使用详解
查看>>