Kafka 使用 Protocol Buffers(简称 Protobuf)作为其消息序列化格式,它是一种高效且可扩展的二进制序列化协议。处理二进制数据时,Protobuf 可以将复杂的数据结构转换为紧凑的二进制格式,从而提高传输效率和减少数据大小。
要在 Kafka 中使用 Protobuf 处理二进制数据,请按照以下步骤操作:
Person 的消息:syntax = "proto3"; message Person { string name = 1; int32 age = 2; bytes avatar = 3; } 在这个例子中,我们定义了一个包含姓名、年龄和头像(avatar)的 Person 消息。头像是一个二进制字段,可以使用 bytes 类型表示。
protoc 编译器根据 .proto 文件生成对应编程语言的代码。例如,为 Java 生成代码:protoc --java_out=. person.proto 这将生成一个名为 PersonOuterClass.java 的文件,其中包含 Person 消息的序列化和反序列化方法。
Person 消息序列化为二进制格式。例如,在 Java 中:import com.example.PersonOuterClass.Person; Person person = Person.newBuilder() .setName("John Doe") .setAge(30) .setAvatar(ByteString.copyFromUtf8("https://example.com/avatar.jpg")) .build(); byte[] serializedPerson = person.toByteArray(); Person 对象。例如,在 Java 中:import com.example.PersonOuterClass.Person; byte[] receivedSerializedPerson = ...; // 从 Kafka 接收到的二进制数据 Person deserializedPerson = Person.parseFrom(receivedSerializedPerson); import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties); producer.send(new ProducerRecord<>("person-topic", person.toByteArray())); import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("person-topic")); while (true) { ConsumerRecord<String, byte[]> record = consumer.poll(Duration.ofMillis(100)); byte[] receivedSerializedPerson = record.value(); Person deserializedPerson = Person.parseFrom(receivedSerializedPerson); // 处理反序列化后的消息 } 通过以上步骤,你可以在 Kafka 中使用 Protobuf 处理二进制数据。这种处理方式可以提高传输效率,减少数据大小,并使数据结构更加紧凑。