如何在 Kafka 流中使用 HashMap 作为值创建状态存储?


我需要创建一个以字符串键 HashMap 作为值的状态存储.我尝试了以下两种方法.

I need to create a state store with String key HashMap as value. I tried the below two methods.

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")


The code compiles fine without any error, but I get a runtime error

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer


Can someone suggest me what is the correct way to create a state store?

如果要创建状态存储,则需要提供 serializerdeserializer 类用于您要使用的类型.在 Kafka Stream 中,有一个名为 Serde 的抽象,它将序列化器和反序列化器包装在一个类中.

If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.

如果你使用 .withValues(Class keyClass) 它必须保持那个

If you use .withValues(Class<K> keyClass) it must hold that

@param keyClass 键的类,必须是Kafka内置serdes的类型之一

@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes

因为 HashMap 没有内置的 Serdes,你需要先实现一个(可能称为 HashMapSerde)并将这个类提供给方法 .withValues(Serde keySerde).此外,您还必须为 HashMap 实现实际的序列化器和反序列化器.如果你知道你的 HashMap 的泛型类型,你应该指定它们(这使得序列化器和反序列化器的实现更简单.

Because there is no built-in Serdes for HashMap you need to implement one first (maybe called HashMapSerde) and give this class to the method .withValues(Serde<K> keySerde). Furhtermore, you must implement the actual serializer and deserializer for HashMap, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.


Something like this (just a sketch; generic types omitted):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */

    void close() {
        /* put your code here */

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */

            public byte[] serialize(String topic, T data) {
                /* put your code here */

            public void close() {
                /* put your code here */

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */

            public T deserialize(String topic, byte[] data) {
                /* put your code here */

            public void close() {
                /* put your code here */

如果您想查看有关如何实现(反)序列化程序和 Serde 的示例,请查看 https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serializationhttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

If you want to see examples for how to implement (de)serializers and Serde, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java