Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt

Use this file to discover all available pages before exploring further.

OpenKnowledgeStream is a Maven multi-module project. The root POM (com.as:OpenKnowledgeStream:0.0.1-SNAPSHOT) declares shared dependencies and build configuration that all three child modules inherit. Each child module defines only its module-specific additions. The project targets Java 21 at the root level; the child modules each set maven.compiler.source and maven.compiler.target to 26.

Maven module hierarchy

<!-- Root pom.xml — com.as:OpenKnowledgeStream:0.0.1-SNAPSHOT -->
<modules>
    <module>opensearch-wiki-indexer</module>
    <module>wiki-change-stream</module>
    <module>wiki-common</module>
</modules>
The root POM inherits from org.springframework.boot:spring-boot-starter-parent:4.1.0 and declares the following shared dependencies available to all child modules:
<!-- Shared dependencies (root pom.xml) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-java</artifactId>
    <version>2.19.0</version>
</dependency>
<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-rest-client</artifactId>
    <version>2.19.0</version>
</dependency>
<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5</artifactId>
</dependency>

wiki-change-stream

Artifact ID: wiki-change-stream
Group ID: com.as
Main class: WikiChangeStream.OpenKnowledgeStreamApplication
This is the pipeline entry point. It hosts the Wikipedia polling loop and the Kafka producer. Its main class enables scheduling and component-scans all modules so that indexer beans are available in the same application context.

Main class

// WikiChangeStream/OpenKnowledgeStreamApplication.java
@SpringBootApplication
@EnableScheduling
@ComponentScan({"com.as", "WikiIndexer", "WikiIndexer.models", "Wikicommon", "WikiChangeStream"})
public class OpenKnowledgeStreamApplication {
    public static void main(String[] args) {
        SpringApplication.run(OpenKnowledgeStreamApplication.class, args);
    }
}

Key classes

AppConfig

Spring @Configuration class in package WikiChangeStream.config. Creates the WebClient bean with the Wikipedia API base URL and a default accept: application/json header.
// WikiChangeStream/config/AppConfig.java
@Bean
public WebClient webClient() {
    return WebClient.builder()
            .baseUrl("https://en.wikipedia.org/w/api.php" +
                     "?action=query&list=recentchanges" +
                     "&format=json&rclimit=100" +
                     "&rcprop=title|tags|ids")
            .defaultHeader("accept", "application/json")
            .build();
}

WikipediaClient

Spring @Component in package WikiChangeStream.clients. Wraps the injected WebClient and exposes a single getRecentChanges() method that blocks until the response is fully received. Maps HTTP 429 to a TooManyRequests exception.
// WikiChangeStream/clients/WikipediaClient.java
@Component
@RequiredArgsConstructor
public class WikipediaClient {
    private final WebClient webClient;

    public Query getRecentChanges() {
        return this.webClient
                .get()
                .retrieve()
                .onStatus(httpStatusCode -> httpStatusCode.value() == 429, clientResponse ->
                        Mono.error(new TooManyRequests("Too many requests...")))
                .bodyToMono(Query.class)
                .block();
    }
}

OpenStream

Spring @Service in package WikiChangeStream.service. Drives the polling loop via @Scheduled(fixedRate = 5000). Iterates over each Change returned by WikipediaClient and delegates to KafkaPublish. Catches TooManyRequests and sleeps the thread for 5 seconds.
// WikiChangeStream/service/OpenStream.java
@Service
@Slf4j
public class OpenStream {
    private final WikipediaClient wikipediaClient;
    private final KafkaPublish kafkaPublish;

    OpenStream(WikipediaClient wikipediaClient, KafkaPublish kafkaPublish) {
        this.wikipediaClient = wikipediaClient;
        this.kafkaPublish = kafkaPublish;
    }

    @Scheduled(fixedRate = 5000)
    private void stream() {
        try {
            Query query = wikipediaClient.getRecentChanges();
            for (Change change : query.getQuery().getRecentChanges()) {
                kafkaPublish.publish(change);
            }
        } catch (TooManyRequests ex) {
            log.warn("Request limit hit...sleeping for 5 seconds...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (Exception ex) {
            log.error("Exception occurred: {}", ex.getMessage());
        }
    }
}

KafkaPublish

Spring @Component in package WikiChangeStream.publish. Constructs a KafkaProducer<String, Change> in its constructor and exposes publish(Change). Sends each change to the recent_change_stream topic using JsonSerializer for the value. A Callback logs the page title on success or the exception message on failure.
// WikiChangeStream/publish/KafkaPublish.java
@Component
@Slf4j
public class KafkaPublish {
    private final String topic = "recent_change_stream";
    KafkaProducer<String, Change> producer = null;

    KafkaPublish() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer",   StringSerializer.class.getName());
        properties.put("value.serializer", JsonSerializer.class.getName());
        producer = new KafkaProducer<>(properties);
    }

    public void publish(Change recentChange) {
        ProducerRecord<String, Change> record = new ProducerRecord<>(topic, recentChange);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    log.info("Change published with title: {}", recentChange.getTitle());
                } else {
                    log.error("Exception occurred while publish: {}", exception.getMessage());
                }
            }
        });
    }
}

Module-specific Maven dependencies

<!-- wiki-change-stream/pom.xml -->
<dependencies>
    <!-- Reactive WebClient for Wikipedia polling -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <!-- Shared data models -->
    <dependency>
        <groupId>com.as</groupId>
        <artifactId>wiki-common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>
    <!-- Indexer beans component-scanned at runtime -->
    <dependency>
        <groupId>com.as</groupId>
        <artifactId>opensearch-wiki-indexer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>
</dependencies>
spring-boot-starter-webflux is declared here and not in the root POM because only wiki-change-stream uses the reactive WebClient. The two other modules do not require a Netty-based reactive runtime.

opensearch-wiki-indexer

Artifact ID: opensearch-wiki-indexer
Group ID: com.as
Main class: WikiIndexer.WikiIndexerStreamApplication
This module is the pipeline sink. It subscribes to the Kafka topic and writes each record into OpenSearch. It can be used either as a standalone Spring Boot application or as a library module component-scanned by wiki-change-stream.

Main class

// WikiIndexer/WikiIndexerStreamApplication.java
@SpringBootApplication
@EnableScheduling
@ComponentScan({"Wikicommon"})
public class WikiIndexerStreamApplication {
    public static void main(String[] args) {
        SpringApplication.run(WikiIndexerStreamApplication.class, args);
    }
}

Key classes

OpensearchConfig

Spring @Configuration class in package WikiIndexer.config. Constructs the OpenSearchClient bean using a plain Apache RestClient pointed at localhost:9200 and a JacksonJsonpMapper for serialization.
// WikiIndexer/config/OpensearchConfig.java
@Bean
public OpenSearchClient openSearchClient() {
    RestClient restClient = RestClient.builder(
            new HttpHost("localhost", 9200)
    ).build();

    OpenSearchTransport transport = new RestClientTransport(
            restClient,
            new JacksonJsonpMapper()
    );

    return new OpenSearchClient(transport);
}

KafkaConsume

Spring @Component in package WikiIndexer.consumer. Constructs a KafkaConsumer<String, Change> in its constructor, subscribes to recent_change_stream, and polls every 5 seconds via @Scheduled(fixedRate = 5000). Uses auto.offset.reset=earliest and trusts all packages for JSON deserialization.
// WikiIndexer/consumer/KafkaConsume.java
@Component
@Slf4j
public class KafkaConsume {
    private final OpensearchIndexer opensearchIndexer;
    KafkaConsumer<String, Change> consumer;

    KafkaConsume(OpensearchIndexer opensearchIndexer) {
        this.opensearchIndexer = opensearchIndexer;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer",  StringDeserializer.class.getName());
        properties.put("value.deserializer", JsonDeserializer.class.getName());
        properties.put("group.id",           "wiki-indexer");
        properties.put("auto.offset.reset",  "earliest");
        properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("recent_change_stream"));
    }

    @Scheduled(fixedRate = 5000)
    public void consume() throws Exception {
        log.info("Starting kafka consumer....");
        ConsumerRecords<String, Change> record =
                consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, Change> change : record) {
            opensearchIndexer.index(change.value());
        }
    }
}

OpensearchIndexer

Spring @Service in package WikiIndexer.index. Accepts a Change and upserts it into the wiki-changes OpenSearch index using the page title as the document ID.
// WikiIndexer/index/OpensearchIndexer.java
@Service
@RequiredArgsConstructor
@Slf4j
public class OpensearchIndexer {
    private final OpenSearchClient openSearchClient;

    public void index(Change change) throws Exception {
        openSearchClient.index(i -> i
                .index("wiki-changes")
                .id(change.getTitle())
                .document(change));
        log.info("Indexed Title: {}", change.getTitle());
    }
}

Module-specific Maven dependencies

<!-- opensearch-wiki-indexer/pom.xml -->
<dependencies>
    <!-- Shared data models -->
    <dependency>
        <groupId>com.as</groupId>
        <artifactId>wiki-common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </dependency>
    <!-- OpenSearch Java client and REST client are inherited from root POM -->
</dependencies>
The opensearch-java and opensearch-rest-client artifacts (both version 2.19.0) are declared in the root POM and are therefore available to this module without being re-declared.

wiki-common

Artifact ID: wiki-common
Group ID: com.as
Main class: none — packaged as a plain JAR
This is a pure library module. The Spring Boot Maven plugin is configured with <skip>true</skip> so it is not repackaged as an executable fat JAR. It contains three Lombok @Data model classes that mirror the Wikipedia Recent Changes API JSON structure.

Key classes

Change

Root document model. Represents a single Wikipedia page change event. The pageId field is mapped from the JSON key pageid via @JsonProperty.
// Wikicommon/models/Change.java
@Data
public class Change {
    private String type;
    private String title;
    @JsonProperty("pageid")
    private Long pageId;
    private List<String> tags;
}

RecentChanges

Wrapper around the list of changes. Maps the JSON key recentchanges to a List<Change> via @JsonProperty.
// Wikicommon/models/RecentChanges.java
@Data
public class RecentChanges {
    @JsonProperty("recentchanges")
    List<Change> recentChanges;
}

Query

Top-level deserialization target for the Wikipedia API response. Contains a single RecentChanges field named query, matching the outer query key in the API JSON envelope.
// Wikicommon/models/Query.java
@Data
public class Query {
    private RecentChanges query;
}

JSON deserialization mapping

The three models together mirror this Wikipedia API response structure:
{
  "query": {
    "recentchanges": [
      {
        "type": "edit",
        "title": "Example article",
        "pageid": 12345,
        "tags": ["mobile edit", "mobile web edit"]
      }
    ]
  }
}

Module-specific Maven dependencies

<!-- wiki-common/pom.xml -->
<!-- No intra-project dependencies.
     jackson-databind and lombok are inherited from the root POM. -->
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <!-- Prevent repackaging as executable fat JAR -->
                <skip>true</skip>
            </configuration>
        </plugin>
    </plugins>
</build>

Build docs developers (and LLMs) love