本文共 3202 字,大约阅读时间需要 10 分钟。
send消息:
@Override public Futuresend(ProducerRecord record, Callback callback) { // 如果拦截器为null,就直接拿封装的ProducerRecord //否则就动用拦截器,对发送的消息进行拦截 ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
如果我们自定义了拦截器,就调用我们实现的onSend方法,走我们的逻辑。
接下来要调用同步阻塞方法waitOnMetadata()方法,去等待获取到目标Topic对应的元数据。如果客户端尚未缓存到元数据,就一定要发送网络请求到Broker,去拉取那个Topic的元数据回来。
//等待获取元数据long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//剩余可阻塞的时间:如果等待获取元数据时,已经阻塞了一段时间了。那么接下来的阻塞,时间就会变短long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
要想往一个Topic里发消息,必须知道这个Topic的元数据:这个Topic有哪些分区、根据Partitioner组件选择一个分区、这个分区对应的Leader在哪个Broker上,然后才能和Broker建立连接、发送消息。
这一次获取到元数据,再以后就可以直接根据缓存来发送了。
序列化key和value:
//将key序列化成字节数组 byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } //将value序列化成字节数组 byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); }
key、value可以是各种类型的:String、Double、Boolean…或者是自定义的对象。但是如果想发送消息给Broker,就必须将key、value都序列化成字节数组
然后基于获取到的Topic元数据,根据Partitioner组件获取消息对应的分区
//使用Partitioner组件获取消息对应的分区int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
然后对我们要发送的这条消息进行安全检查,是否超出了请求的最大值、内存缓冲的最大值:
//检查要发送的这条消息,是否超出了请求的最大大小、内存缓冲的最大大小ensureValidRecordSize(serializedSize);
根据拦截器来确定Callback:
//如果我们自定义了拦截器,就为拦截器创建Callback函数Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
这个callback就是我们在调用producer.send()方法时,传进来的:
producer.send(record,new Callback(){......})
在消息发送完成之后,回调我们自定义实现的Callback函数,通知我们消息发送的结果。
然后进行最关键的一步:将消息添加到内存缓冲里去(RecordAccumulator组件负责的)
//将消息添加到内存缓冲里去,RecordAccumulator组件负责的RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
如果某个分区对应的batch满了、或者有一个新batch被创建出来了,就唤醒Sender线程去工作(发送batch):
//如果batch已经满了,或者有一个新batch被创建了 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); //唤醒Sender线程去工作 this.sender.wakeup(); }
转载地址:http://cdbq.baihongyu.com/