OpenKnowledgeStream manages Kafka entirely through hand-configured JavaDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt
Use this file to discover all available pages before exploring further.
Properties objects rather than Spring Boot’s auto-configuration. The wiki-change-stream module constructs a KafkaProducer inside KafkaPublish, while the opensearch-wiki-indexer module constructs a KafkaConsumer inside KafkaConsume. Both connect to the same broker and share the same topic, so any broker address change must be applied in both classes.
Producer configuration
The producer is created inKafkaPublish’s constructor and publishes Change objects serialized as JSON.
Comma-separated list of Kafka broker addresses the producer uses to bootstrap its connection to the cluster. Change this to match your Kafka broker host and port.
Serializer class for the record key. Keys are plain strings, so
StringSerializer is used.Serializer class for the record value. Spring Kafka’s
JsonSerializer converts each Change POJO to a JSON byte array before sending.The Kafka topic to which change events are published. This value is hardcoded as the
topic field in KafkaPublish.KafkaPublish.java
Consumer configuration
The consumer is created inKafkaConsume’s constructor and subscribes to recent_change_stream to receive Change events for indexing into OpenSearch.
Kafka broker address the consumer uses to bootstrap its connection. Must match the broker address used by the producer.
Deserializer class for the record key. Matches the
StringSerializer used on the producer side.Deserializer class for the record value. Spring Kafka’s
JsonDeserializer reconstructs Change POJOs from the JSON byte arrays produced by JsonSerializer.The consumer group ID. All consumer instances sharing this ID will cooperatively consume partitions of
recent_change_stream.Determines where to start reading when no committed offset exists for the consumer group.
earliest means the consumer will read from the beginning of the topic on first start.Comma-separated list of packages that
JsonDeserializer is permitted to deserialize into. The wildcard * allows all packages, including the Wikicommon.models package where Change lives.The topic the consumer subscribes to. Set via
consumer.subscribe(Arrays.asList("recent_change_stream")) in the constructor.KafkaConsume.java
Changing the broker address
bootstrap.servers is hardcoded in two separate files. When pointing OpenKnowledgeStream at a non-local Kafka cluster, update the property in both locations:wiki-change-stream/src/main/java/WikiChangeStream/publish/KafkaPublish.javaopensearch-wiki-indexer/src/main/java/WikiIndexer/consumer/KafkaConsume.java
"localhost:9092" with your broker’s host and port, e.g. "kafka.example.com:9092".