Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt

Use this file to discover all available pages before exploring further.

opensearch-wiki-indexer is the module that consumes Wikipedia change events from Kafka and indexes them into OpenSearch. Running on a fixed 5-second schedule, it polls the recent_change_stream topic and writes each Change document to the wiki-changes OpenSearch index using the page title as the document ID.

Classes

WikiIndexerStreamApplication

Package: WikiIndexer The application entry point for the indexer module. Bootstraps the Spring context, enables scheduled task execution, and scans the Wikicommon package for shared model beans.
WikiIndexerStreamApplication.java
package WikiIndexer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@ComponentScan({"Wikicommon"})
public class WikiIndexerStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(WikiIndexerStreamApplication.class, args);
    }

}
AnnotationPurpose
@SpringBootApplicationEnables auto-configuration and component scanning
@EnableSchedulingActivates Spring’s @Scheduled task executor
@ComponentScan({"Wikicommon"})Registers the shared Wikicommon package for Spring bean detection

KafkaConsume

Package: WikiIndexer.consumer
Annotations: @Component, @Slf4j
Manages a KafkaConsumer that polls the recent_change_stream topic and delegates each record to OpensearchIndexer.
KafkaConsume.java
package WikiIndexer.consumer;

import WikiIndexer.index.OpensearchIndexer;
import Wikicommon.models.Change;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

@Component
@Slf4j
public class KafkaConsume {

    private final OpensearchIndexer opensearchIndexer;
    KafkaConsumer<String, Change> consumer;

    KafkaConsume(OpensearchIndexer opensearchIndexer) {
        this.opensearchIndexer = opensearchIndexer;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", JsonDeserializer.class.getName());
        properties.put("group.id", "wiki-indexer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(
                JsonDeserializer.TRUSTED_PACKAGES,
                "*"
        );
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("recent_change_stream"));
    }

    @Scheduled(fixedRate = 5000)
    public void consume() throws Exception {
        log.info("Starting kafka consumer....");

        ConsumerRecords<String, Change> record = consumer.poll(Duration.ofMillis(1000));
        for(ConsumerRecord<String, Change> change : record) {
            opensearchIndexer.index(change.value());
        }
    }
}
Consumer configuration
PropertyValue
bootstrap.serverslocalhost:9092
key.deserializerStringDeserializer
value.deserializerJsonDeserializer (Spring Kafka)
group.idwiki-indexer
auto.offset.resetearliest
TRUSTED_PACKAGES* (all packages trusted for deserialization)
Topicrecent_change_stream
consume()
@Scheduled(fixedRate = 5000)
public void consume() throws Exception
Polls Kafka with a 1-second timeout every 5 seconds. For each ConsumerRecord returned, extracts the Change value and calls opensearchIndexer.index(change.value()).
TRUSTED_PACKAGES is set to * to allow the JsonDeserializer to deserialize the Wikicommon.models.Change class from the shared wiki-common module without requiring an explicit package allowlist.

OpensearchIndexer

Package: WikiIndexer.index
Annotations: @Service, @RequiredArgsConstructor, @Slf4j
Writes a single Change document into the wiki-changes OpenSearch index. Field
NameTypeDescription
openSearchClientOpenSearchClientInjected Spring bean configured by OpensearchConfig
index(Change change)
OpensearchIndexer.java
public void index(Change change) throws Exception {
    openSearchClient.index(i -> i
            .index("wiki-changes")
            .id(change.getTitle())
            .document(change));
    log.info("Indexed Title: {}", change.getTitle());
}
Issues an index request against the wiki-changes index, using the page title as the document ID. Logs the indexed title on success.
Using change.getTitle() as the document ID means that repeated edits to the same Wikipedia page will overwrite the existing document rather than create a new one. This keeps the index at one document per page.

OpensearchConfig

Package: WikiIndexer.config
Annotation: @Configuration
Declares the OpenSearchClient bean used by OpensearchIndexer.
OpensearchConfig.java
package WikiIndexer.config;

import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.apache.http.HttpHost;
import org.opensearch.client.RestClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OpensearchConfig {

    @Bean
    public OpenSearchClient openSearchClient() {
        RestClient restClient =
                RestClient.builder(
                        new HttpHost("localhost", 9200)
                ).build();

        OpenSearchTransport transport =
                new RestClientTransport(
                        restClient,
                        new JacksonJsonpMapper()
                );

        return new OpenSearchClient(transport);
    }
}
Client construction chain
HttpHost("localhost", 9200)
  └─► RestClient
        └─► RestClientTransport (+ JacksonJsonpMapper)
              └─► OpenSearchClient
The JacksonJsonpMapper handles JSON serialization and deserialization for all OpenSearch request and response bodies.

Maven Artifact

Group ID: com.as
Artifact ID: opensearch-wiki-indexer
Version: 0.0.1-SNAPSHOT
pom.xml
<dependency>
    <groupId>com.as</groupId>
    <artifactId>opensearch-wiki-indexer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
Key dependencies

wiki-common

Shared data model classes (Change, Query, RecentChanges)

opensearch-java 2.19.0

High-level Java client for OpenSearch index operations

opensearch-rest-client 2.19.0

Low-level REST transport layer for the OpenSearch client

Build docs developers (and LLMs) love