Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt

Use this file to discover all available pages before exploring further.

wiki-change-stream is the Spring Boot module that periodically polls the Wikipedia Recent Changes API and publishes each change event to Kafka. It runs on a fixed 5-second schedule, fetches the latest 100 page edits, and forwards each one as a serialized Change record to the recent_change_stream topic.

Classes

OpenKnowledgeStreamApplication

Package: WikiChangeStream The application entry point. Bootstraps the Spring context, enables scheduled task execution, and registers component packages for scanning.
OpenKnowledgeStreamApplication.java
package WikiChangeStream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@ComponentScan({"com.as", "WikiIndexer", "WikiIndexer.models", "Wikicommon", "WikiChangeStream"})
public class OpenKnowledgeStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(OpenKnowledgeStreamApplication.class, args);
    }

}
AnnotationPurpose
@SpringBootApplicationEnables auto-configuration and component scanning
@EnableSchedulingActivates Spring’s @Scheduled task executor
@ComponentScanRegisters WikiChangeStream, WikiIndexer, and Wikicommon packages

OpenStream

Package: WikiChangeStream.service
Annotations: @Service, @Slf4j
The core polling service. Calls WikipediaClient on a fixed 5-second interval and fans each change out to KafkaPublish. Constructor
OpenStream(WikipediaClient wikipediaClient, KafkaPublish kafkaPublish)
Receives both collaborators via constructor injection. stream()
OpenStream.java
@Scheduled(fixedRate = 5000)
private void stream() {
    try {
        Query query = wikipediaClient.getRecentChanges();
        for(Change change : query.getQuery().getRecentChanges()) {
            kafkaPublish.publish(change);
        }
    } catch (TooManyRequests ex) {
        log.warn("Request limit hit...sleeping for 5 seconds...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    catch (Exception ex) {
        log.error("Exception occurred: {}", ex.getMessage());
    }
}
When Wikipedia returns HTTP 429, stream() catches TooManyRequests and pauses the current thread for 5 seconds before the next scheduled invocation resumes normally.

WikipediaClient

Package: WikiChangeStream.clients
Annotations: @Component, @RequiredArgsConstructor
Wraps the reactive WebClient to fetch recent changes from the Wikipedia API. The base URL is configured by the AppConfig bean. Field
NameTypeDescription
webClientWebClientInjected Spring bean; base URL pre-configured to the Wikipedia API endpoint
getRecentChanges()
WikipediaClient.java
public Query getRecentChanges() {
    return this.webClient
            .get()
            .retrieve()
            .onStatus(httpStatusCode -> httpStatusCode.value() == 429, clientResponse ->
                    Mono.error(new TooManyRequests("Too many requests...")))
            .bodyToMono(Query.class)
            .block();
}
Returns a deserialized Query object. Throws TooManyRequests if the API responds with HTTP 429. The .block() call makes the reactive stream synchronous.

KafkaPublish

Package: WikiChangeStream.publish
Annotations: @Component, @Slf4j
Creates and manages a KafkaProducer that serializes Change objects to JSON and sends them to the recent_change_stream topic.
KafkaPublish.java
package WikiChangeStream.publish;

import Wikicommon.models.Change;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

import java.util.Properties;

@Component
@Slf4j
public class KafkaPublish {
    private final String topic = "recent_change_stream";
    KafkaProducer<String, Change> producer = null;

    KafkaPublish() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", JsonSerializer.class.getName());
        producer = new KafkaProducer<>(properties);
    }

    public void publish(Change recentChange) {
        ProducerRecord<String, Change> record = new ProducerRecord<>(topic, recentChange);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null) {
                    log.info("Change published with title: {}", recentChange.getTitle());
                } else {
                    log.error("Exception occurred while publish: {}", exception.getMessage());
                }
            }
        });
    }
}
Producer configuration
PropertyValue
bootstrap.serverslocalhost:9092
key.serializerStringSerializer
value.serializerJsonSerializer (Spring Kafka)
Topicrecent_change_stream
publish(Change recentChange) Wraps recentChange in a ProducerRecord and sends it asynchronously. The send callback logs the page title on success, or the exception message on failure.

AppConfig

Package: WikiChangeStream.config
Annotation: @Configuration
Declares the WebClient bean consumed by WikipediaClient.
AppConfig.java
@Configuration
public class AppConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("https://en.wikipedia.org/w/api.php?action=query&list=recentchanges&format=json&rclimit=100&rcprop=title|tags|ids")
                .defaultHeader("accept", "application/json")
                .build();
    }

}
The base URL requests the last 100 recent changes (rclimit=100) and returns the title, tags, and ids properties for each entry. Adjust rclimit to tune throughput vs. API quota usage.

TooManyRequests

Package: WikiChangeStream.exception A RuntimeException subclass thrown by WikipediaClient when the Wikipedia API responds with HTTP 429 (Too Many Requests).
TooManyRequests.java
package WikiChangeStream.exception;

public class TooManyRequests extends RuntimeException {
    public TooManyRequests(String message) {
        super(message);
    }
}
OpenStream.stream() catches this exception and backs off for 5 seconds before the next poll attempt.

Maven Artifact

Group ID: com.as
Artifact ID: wiki-change-stream
Version: 0.0.1-SNAPSHOT
pom.xml
<dependency>
    <groupId>com.as</groupId>
    <artifactId>wiki-change-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
Key dependencies

wiki-common

Shared data model classes (Change, Query, RecentChanges)

spring-boot-starter-webflux

Reactive HTTP client (WebClient) for Wikipedia API calls

opensearch-wiki-indexer

Downstream consumer of the Kafka topic produced by this module

Build docs developers (and LLMs) love