歡迎您光臨本站 註冊首頁

Kafka Java Producer代碼實例詳解

←手機掃碼閱讀     madbeef @ 2020-06-08 , reply:0

根據業務需要可以使用Kafka提供的Java Producer API進行產生數據,並將產生的數據發送到Kafka對應Topic的對應分區中,入口類為:Producer

Kafka的Producer API主要提供下列三個方法:

  •   public void send(KeyedMessage

    message) 發送單條數據到Kafka集群
  •   public void send(List<KeyedMessage

    > messages) 發送多條數據(數據集)到Kafka集群
  •   public void close() 關閉Kafka連接資源

一、JavaKafkaProducerPartitioner:自定義的數據分區器,功能是:決定輸入的key/value鍵值對的message發送到Topic的那個分區中,返回分區id,範圍:[0,分區數量); 這裡的實現比較簡單,根據key中的數字決定分區的值。具體代碼如下:

  import kafka.producer.Partitioner;  import kafka.utils.VerifiableProperties;    /**   * Created by gerry on 12/21.   */  public class JavaKafkaProducerPartitioner implements Partitioner {      /**     * 無參構造函數     */    public JavaKafkaProducerPartitioner() {      this(new VerifiableProperties());    }      /**     * 構造函數,必須給定     *     * @param properties 上下文     */    public JavaKafkaProducerPartitioner(VerifiableProperties properties) {      // nothings    }      @Override    public int partition(Object key, int numPartitions) {      int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());      return num % numPartitions;    }  }

 

二、 JavaKafkaProducer:通過Kafka提供的API進行數據產生操作的測試類;具體代碼如下:

  import kafka.javaapi.producer.Producer;  import kafka.producer.KeyedMessage;  import kafka.producer.ProducerConfig;  import org.apache.log4j.Logger;    import java.util.Properties;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  import java.util.concurrent.TimeUnit;  import java.util.concurrent.atomic.AtomicBoolean;  import java.util.concurrent.ThreadLocalRandom;    /**   * Created by gerry on 12/21.   */  public class JavaKafkaProducer {    private Logger logger = Logger.getLogger(JavaKafkaProducer.class);    public static final String TOPIC_NAME = "test";    public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();    public static final int chartsLength = charts.length;        public static void main(String[] args) {      String brokerList = "192.168.187.149:9092";      brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";      brokerList = "192.168.187.146:9092";      Properties props = new Properties();      props.put("metadata.broker.list", brokerList);      /**       * 0表示不等待結果返回
       * 1表示等待至少有一個服務器返回數據接收標識
       * -1表示必須接收到所有的服務器返回標識,及同步寫入
       * */      props.put("request.required.acks", "0");      /**       * 內部發送數據是異步還是同步       * sync:同步, 默認       * async:異步       */      props.put("producer.type", "async");      /**       * 設置序列化的類       * 可選:kafka.serializer.StringEncoder       * 默認:kafka.serializer.DefaultEncoder       */      props.put("serializer.class", "kafka.serializer.StringEncoder");      /**       * 設置分區類       * 根據key進行數據分區       * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區       * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉換為字節數組後進行hash分區       */      props.put("partitioner.class", "JavaKafkaProducerPartitioner");        // 重試次數      props.put("message.send.max.retries", "3");        // 異步提交的時候(async),併發提交的記錄數      props.put("batch.num.messages", "200");        // 設置緩衝區大小,默認10KB      props.put("send.buffer.bytes", "102400");        // 2. 構建Kafka Producer Configuration上下文      ProducerConfig config = new ProducerConfig(props);        // 3. 構建Producer對象      final Producer

 

三、Pom.xml依賴配置如下

  0.8.2.1org.apache.kafkakafka_2.10${kafka.version}



[madbeef ] Kafka Java Producer代碼實例詳解已經有235次圍觀

http://coctec.com/docs/java/show-post-237522.html