Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt

Use this file to discover all available pages before exploring further.

Every five seconds, OpenKnowledgeStream automatically polls the Wikipedia Recent Changes API and pushes the resulting events through a Kafka topic into an OpenSearch index. The flow is entirely driven by two independent @Scheduled methods — one that produces records and one that consumes them — with Kafka acting as the durable buffer between the two.
1

Wikipedia API request

OpenStream.stream() is annotated with @Scheduled(fixedRate = 5000) and fires every 5 000 ms inside the wiki-change-stream Spring Boot application. It delegates immediately to WikipediaClient.getRecentChanges(), which issues a synchronous GET request through a Spring WebFlux WebClient configured with the base URL:
https://en.wikipedia.org/w/api.php?action=query&list=recentchanges&format=json&rclimit=100&rcprop=title|tags|ids
The WebClient bean is created in AppConfig with an accept: application/json default header. The reactive pipeline terminates with .block(), making the call synchronous within the scheduler thread. The raw JSON response is automatically deserialized into a Query object by Jackson.
// WikiChangeStream/clients/WikipediaClient.java
public Query getRecentChanges() {
    return this.webClient
            .get()
            .retrieve()
            .onStatus(httpStatusCode -> httpStatusCode.value() == 429, clientResponse ->
                    Mono.error(new TooManyRequests("Too many requests...")))
            .bodyToMono(Query.class)
            .block();
}
2

Change extraction

The deserialized Query object contains a single RecentChanges field (also named query) which holds a List<Change> of up to 100 entries. OpenStream.stream() iterates over this list and passes each element to the Kafka publisher:
// WikiChangeStream/service/OpenStream.java
Query query = wikipediaClient.getRecentChanges();
for (Change change : query.getQuery().getRecentChanges()) {
    kafkaPublish.publish(change);
}
Each Change carries four fields mapped directly from the Wikipedia JSON response:
Java fieldJSON keyType
typetypeString
titletitleString
pageIdpageidLong
tagstagsList<String>
// Wikicommon/models/Change.java
@Data
public class Change {
    private String type;
    private String title;
    @JsonProperty("pageid")
    private Long pageId;
    private List<String> tags;
}
3

Kafka publishing

KafkaPublish.publish() wraps each Change in a ProducerRecord and sends it to the recent_change_stream topic using a KafkaProducer<String, Change>. The record key is null (no explicit key is set); the value is serialized to JSON by Spring Kafka’s JsonSerializer. An asynchronous Callback logs the outcome:
// WikiChangeStream/publish/KafkaPublish.java
public void publish(Change recentChange) {
    ProducerRecord<String, Change> record = new ProducerRecord<>(topic, recentChange);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                log.info("Change published with title: {}", recentChange.getTitle());
            } else {
                log.error("Exception occurred while publish: {}", exception.getMessage());
            }
        }
    });
}
The producer is configured with the following properties:
// Kafka producer properties (KafkaPublish constructor)
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer",   StringSerializer.class.getName());
properties.put("value.serializer", JsonSerializer.class.getName());
4

Rate limit handling

WikipediaClient.getRecentChanges() maps an HTTP 429 Too Many Requests response to a TooManyRequests exception via the onStatus operator. OpenStream.stream() catches this exception and blocks the scheduler thread for 5 seconds before the next scheduled invocation naturally fires:
// WikiChangeStream/service/OpenStream.java
} catch (TooManyRequests ex) {
    log.warn("Request limit hit...sleeping for 5 seconds...");
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
Any other unchecked exception is caught by a second catch (Exception ex) block that logs the error message and allows the next scheduled invocation to proceed normally.
5

Kafka consumption

KafkaConsume.consume() is annotated with @Scheduled(fixedRate = 5000) inside the opensearch-wiki-indexer module (component-scanned by OpenKnowledgeStreamApplication). On each invocation it calls consumer.poll(Duration.ofMillis(1000)), which returns all records currently available on the recent_change_stream topic for the wiki-indexer consumer group:
// WikiIndexer/consumer/KafkaConsume.java
@Scheduled(fixedRate = 5000)
public void consume() throws Exception {
    log.info("Starting kafka consumer....");
    ConsumerRecords<String, Change> record = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, Change> change : record) {
        opensearchIndexer.index(change.value());
    }
}
The consumer is configured with the following properties:
// Kafka consumer properties (KafkaConsume constructor)
properties.put("bootstrap.servers",  "localhost:9092");
properties.put("key.deserializer",   StringDeserializer.class.getName());
properties.put("value.deserializer", JsonDeserializer.class.getName());
properties.put("group.id",           "wiki-indexer");
properties.put("auto.offset.reset",  "earliest");
properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
The consumer subscribes to Arrays.asList("recent_change_stream") and uses auto.offset.reset=earliest, so it will replay all unprocessed records from the beginning of the topic on first start.
6

OpenSearch indexing

For each consumed Change, OpensearchIndexer.index() calls the OpenSearch Java client’s index() API to upsert the document into the wiki-changes index. The page title is used as the document ID, meaning repeated changes to the same page overwrite the previous document rather than creating duplicates:
// WikiIndexer/index/OpensearchIndexer.java
public void index(Change change) throws Exception {
    openSearchClient.index(i -> i
            .index("wiki-changes")
            .id(change.getTitle())
            .document(change));
    log.info("Indexed Title: {}", change.getTitle());
}
The OpenSearchClient bean is built in OpensearchConfig over a plain RestClient pointing at localhost:9200, using Jackson for JSON mapping via JacksonJsonpMapper:
// WikiIndexer/config/OpensearchConfig.java
RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9200)
).build();

OpenSearchTransport transport = new RestClientTransport(
        restClient,
        new JacksonJsonpMapper()
);

return new OpenSearchClient(transport);
Because the page title is the document ID, the wiki-changes index always holds the most recent observed state for each Wikipedia page title. Rapidly edited pages will reflect only their latest polled change.

End-to-end sequence summary

Wikipedia API

    │  GET /w/api.php?action=query&list=recentchanges&...
    │  ◄──── every 5 s (WikipediaClient via WebClient) ────

    │  JSON { query: { recentchanges: [ {type, title, pageid, tags}, ... ] } }
    │  ────────────────────────────────────────────────────►


OpenStream.stream()
    │  iterates List<Change> (up to 100 entries)

KafkaPublish.publish(change)
    │  ProducerRecord → topic: recent_change_stream
    │  value serialized with JsonSerializer

Kafka  (localhost:9092)
    │  topic: recent_change_stream

KafkaConsume.consume()   ◄── every 5 s, consumer.poll(1 000 ms)
    │  group.id: wiki-indexer, auto.offset.reset: earliest

OpensearchIndexer.index(change)
    │  index: wiki-changes, id: change.getTitle()

OpenSearch  (localhost:9200)
    index: wiki-changes

Build docs developers (and LLMs) love