Every five seconds, OpenKnowledgeStream automatically polls the Wikipedia Recent Changes API and pushes the resulting events through a Kafka topic into an OpenSearch index. The flow is entirely driven by two independentDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/amitsaxena098/OpenKnowledgeStream/llms.txt
Use this file to discover all available pages before exploring further.
@Scheduled methods — one that produces records and one that consumes them — with Kafka acting as the durable buffer between the two.
Wikipedia API request
OpenStream.stream() is annotated with @Scheduled(fixedRate = 5000) and fires every 5 000 ms inside the wiki-change-stream Spring Boot application. It delegates immediately to WikipediaClient.getRecentChanges(), which issues a synchronous GET request through a Spring WebFlux WebClient configured with the base URL:WebClient bean is created in AppConfig with an accept: application/json default header. The reactive pipeline terminates with .block(), making the call synchronous within the scheduler thread. The raw JSON response is automatically deserialized into a Query object by Jackson.Change extraction
The deserialized Each
Query object contains a single RecentChanges field (also named query) which holds a List<Change> of up to 100 entries. OpenStream.stream() iterates over this list and passes each element to the Kafka publisher:Change carries four fields mapped directly from the Wikipedia JSON response:| Java field | JSON key | Type |
|---|---|---|
type | type | String |
title | title | String |
pageId | pageid | Long |
tags | tags | List<String> |
Kafka publishing
KafkaPublish.publish() wraps each Change in a ProducerRecord and sends it to the recent_change_stream topic using a KafkaProducer<String, Change>. The record key is null (no explicit key is set); the value is serialized to JSON by Spring Kafka’s JsonSerializer. An asynchronous Callback logs the outcome:Rate limit handling
WikipediaClient.getRecentChanges() maps an HTTP 429 Too Many Requests response to a TooManyRequests exception via the onStatus operator. OpenStream.stream() catches this exception and blocks the scheduler thread for 5 seconds before the next scheduled invocation naturally fires:Any other unchecked exception is caught by a second
catch (Exception ex) block that logs the error message and allows the next scheduled invocation to proceed normally.Kafka consumption
KafkaConsume.consume() is annotated with @Scheduled(fixedRate = 5000) inside the opensearch-wiki-indexer module (component-scanned by OpenKnowledgeStreamApplication). On each invocation it calls consumer.poll(Duration.ofMillis(1000)), which returns all records currently available on the recent_change_stream topic for the wiki-indexer consumer group:Arrays.asList("recent_change_stream") and uses auto.offset.reset=earliest, so it will replay all unprocessed records from the beginning of the topic on first start.OpenSearch indexing
For each consumed The
Change, OpensearchIndexer.index() calls the OpenSearch Java client’s index() API to upsert the document into the wiki-changes index. The page title is used as the document ID, meaning repeated changes to the same page overwrite the previous document rather than creating duplicates:OpenSearchClient bean is built in OpensearchConfig over a plain RestClient pointing at localhost:9200, using Jackson for JSON mapping via JacksonJsonpMapper: