根據業務需要可以使用Kafka提供的Java Producer API進行產生數據,並將產生的數據發送到Kafka對應Topic的對應分區中,入口類為:Producer
Kafka的Producer API主要提供下列三個方法:
public void send(KeyedMessage
public void send(List<KeyedMessage
public void close() 關閉Kafka連接資源
一、JavaKafkaProducerPartitioner:自定義的數據分區器,功能是:決定輸入的key/value鍵值對的message發送到Topic的那個分區中,返回分區id,範圍:[0,分區數量); 這裡的實現比較簡單,根據key中的數字決定分區的值。具體代碼如下:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; /** * Created by gerry on 12/21. */ public class JavaKafkaProducerPartitioner implements Partitioner { /** * 無參構造函數 */ public JavaKafkaProducerPartitioner() { this(new VerifiableProperties()); } /** * 構造函數,必須給定 * * @param properties 上下文 */ public JavaKafkaProducerPartitioner(VerifiableProperties properties) { // nothings } @Override public int partition(Object key, int numPartitions) { int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim()); return num % numPartitions; } }
二、 JavaKafkaProducer:通過Kafka提供的API進行數據產生操作的測試類;具體代碼如下:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ThreadLocalRandom; /** * Created by gerry on 12/21. */ public class JavaKafkaProducer { private Logger logger = Logger.getLogger(JavaKafkaProducer.class); public static final String TOPIC_NAME = "test"; public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray(); public static final int chartsLength = charts.length; public static void main(String[] args) { String brokerList = "192.168.187.149:9092"; brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095"; brokerList = "192.168.187.146:9092"; Properties props = new Properties(); props.put("metadata.broker.list", brokerList); /** * 0表示不等待結果返回 * 1表示等待至少有一個服務器返回數據接收標識 * -1表示必須接收到所有的服務器返回標識,及同步寫入 * */ props.put("request.required.acks", "0"); /** * 內部發送數據是異步還是同步 * sync:同步, 默認 * async:異步 */ props.put("producer.type", "async"); /** * 設置序列化的類 * 可選:kafka.serializer.StringEncoder * 默認:kafka.serializer.DefaultEncoder */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 設置分區類 * 根據key進行數據分區 * 默認是:kafka.producer.DefaultPartitioner ==> 安裝key的hash進行分區 * 可選:kafka.serializer.ByteArrayPartitioner ==> 轉換為字節數組後進行hash分區 */ props.put("partitioner.class", "JavaKafkaProducerPartitioner"); // 重試次數 props.put("message.send.max.retries", "3"); // 異步提交的時候(async),併發提交的記錄數 props.put("batch.num.messages", "200"); // 設置緩衝區大小,默認10KB props.put("send.buffer.bytes", "102400"); // 2. 構建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 構建Producer對象 final Producer
三、Pom.xml依賴配置如下
0.8.2.1org.apache.kafkakafka_2.10${kafka.version}
[madbeef ] Kafka Java Producer代碼實例詳解已經有235次圍觀