Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/mcp-for-beginners/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Real-time data streaming has become essential in today’s data-driven world, where businesses and applications require immediate access to information to make timely decisions. MCP transforms real-time streaming by providing a standardized approach to context management across AI models, streaming platforms, and applications.
Contextual continuity Maintain relationships between data points across the entire pipeline
Optimized transmission Reduce redundancy through intelligent context management
Standardized interfaces Consistent APIs for all streaming components
Enhanced scalability Horizontal scaling while preserving context
MCP streaming architecture
Data Sources (IoT, APIs, DBs, Apps)
│
▼
Streaming Connectors ──► Protocol Adapters ──► Context Handlers
│
Context Store
│
Stream Processors
│ │
Real-time Analytics ML Models
│
Applications & Services
Core concepts
Challenges MCP addresses
Challenge MCP solution Context loss across distributed components Standardized context serialization per MCP spec Scalability Horizontal scaling with preserved context Integration complexity Protocol adapters for diverse streaming tech Latency management Efficient context handling reduces overhead Data consistency Stateful stream processing with unified context
Apache Kafka integration
MCP can use Kafka as a transport layer by implementing a custom Transport class that bridges Kafka topics and MCP’s JSON-RPC protocol.
import asyncio
import json
from typing import Optional
from confluent_kafka import Consumer, Producer, KafkaError
from mcp.client import Client, ClientCapabilities
from mcp.core.message import JsonRpcMessage
from mcp.core.transports import Transport
class KafkaMCPTransport ( Transport ):
def __init__ (
self ,
bootstrap_servers : str ,
input_topic : str ,
output_topic : str
):
self .bootstrap_servers = bootstrap_servers
self .input_topic = input_topic
self .output_topic = output_topic
self .producer = Producer(
{ 'bootstrap.servers' : bootstrap_servers})
self .consumer = Consumer({
'bootstrap.servers' : bootstrap_servers,
'group.id' : 'mcp-client-group' ,
'auto.offset.reset' : 'earliest'
})
self .message_queue = asyncio.Queue()
self .running = False
async def connect ( self ):
self .consumer.subscribe([ self .input_topic])
self .running = True
self .consumer_task = asyncio.create_task( self ._consume_messages())
return self
async def _consume_messages ( self ):
while self .running:
try :
msg = self .consumer.poll( 1.0 )
if msg is None :
await asyncio.sleep( 0.1 )
continue
if msg.error():
if msg.error().code() == KafkaError. _PARTITION_EOF :
continue
print ( f "Consumer error: { msg.error() } " )
continue
message_str = msg.value().decode( 'utf-8' )
message_data = json.loads(message_str)
mcp_message = JsonRpcMessage.from_dict(message_data)
await self .message_queue.put(mcp_message)
except Exception as e:
print ( f "Error in consumer loop: { e } " )
await asyncio.sleep( 1 )
async def read ( self ) -> Optional[JsonRpcMessage]:
return await self .message_queue.get()
async def write ( self , message : JsonRpcMessage) -> None :
message_json = json.dumps(message.to_dict())
self .producer.produce(
self .output_topic,
message_json.encode( 'utf-8' ),
callback = self ._delivery_report
)
self .producer.poll( 0 )
def _delivery_report ( self , err , msg ):
if err is not None :
print ( f 'Message delivery failed: { err } ' )
async def close ( self ) -> None :
self .running = False
if self .consumer_task:
self .consumer_task.cancel()
try :
await self .consumer_task
except asyncio.CancelledError:
pass
self .consumer.close()
self .producer.flush()
# Usage
async def kafka_mcp_example ():
client = Client(
{ "name" : "kafka-mcp-client" , "version" : "1.0.0" },
ClientCapabilities({})
)
transport = KafkaMCPTransport(
bootstrap_servers = "localhost:9092" ,
input_topic = "mcp-responses" ,
output_topic = "mcp-requests"
)
await client.connect(transport)
try :
await client.initialize()
response = await client.execute_tool(
"process_data" ,
{
"data" : "sample data" ,
"metadata" : {
"source" : "sensor-1" ,
"timestamp" : "2025-06-12T10:30:00Z"
}
}
)
print ( f "Tool response: { response } " )
await client.shutdown()
finally :
await transport.close()
if __name__ == "__main__" :
asyncio.run(kafka_mcp_example())
Apache Pulsar integration
Pulsar provides a unified messaging and streaming platform with built-in acknowledgment semantics.
import asyncio
import json
import pulsar
from typing import Optional
from mcp.core.message import JsonRpcMessage
from mcp.core.transports import Transport
from mcp.server import Server, ServerOptions
from mcp.server.tools import Tool, ToolExecutionContext, ToolMetadata
class PulsarMCPTransport ( Transport ):
def __init__ (
self ,
service_url : str ,
request_topic : str ,
response_topic : str
):
self .client = pulsar.Client(service_url)
self .producer = self .client.create_producer(response_topic)
self .consumer = self .client.subscribe(
request_topic,
"mcp-server-subscription" ,
consumer_type = pulsar.ConsumerType.Shared
)
self .message_queue = asyncio.Queue()
self .running = False
async def connect ( self ):
self .running = True
self .consumer_task = asyncio.create_task( self ._consume_messages())
return self
async def _consume_messages ( self ):
while self .running:
try :
msg = self .consumer.receive( timeout_millis = 500 )
message_str = msg.data().decode( 'utf-8' )
message_data = json.loads(message_str)
mcp_message = JsonRpcMessage.from_dict(message_data)
await self .message_queue.put(mcp_message)
self .consumer.acknowledge(msg)
except Exception :
await asyncio.sleep( 0.1 )
async def read ( self ) -> Optional[JsonRpcMessage]:
return await self .message_queue.get()
async def write ( self , message : JsonRpcMessage) -> None :
message_json = json.dumps(message.to_dict())
self .producer.send(message_json.encode( 'utf-8' ))
async def close ( self ) -> None :
self .running = False
if self .consumer_task:
self .consumer_task.cancel()
self .consumer.close()
self .producer.close()
self .client.close()
@Tool (
name = "process_streaming_data" ,
description = "Process streaming data with context preservation" ,
metadata = ToolMetadata( required_capabilities = [ "streaming" ])
)
async def process_streaming_data (
ctx : ToolExecutionContext,
data : str ,
source : str ,
priority : str = "medium"
) -> dict :
conversation_id = ctx.conversation_id if hasattr (ctx, 'conversation_id' ) else "unknown"
return {
"processed_data" : f "Processed: { data } " ,
"context" : {
"conversation_id" : conversation_id,
"source" : source,
"priority" : priority,
}
}
async def run_mcp_server_with_pulsar ():
server = Server(
{ "name" : "pulsar-mcp-server" , "version" : "1.0.0" },
ServerOptions( capabilities = { "streaming" : True })
)
server.register_tool(process_streaming_data)
transport = PulsarMCPTransport(
service_url = "pulsar://localhost:6650" ,
request_topic = "mcp-requests" ,
response_topic = "mcp-responses"
)
try :
await server.run(transport)
finally :
await transport.close()
if __name__ == "__main__" :
asyncio.run(run_mcp_server_with_pulsar())
Use cases
IoT sensor networks Preserve device context as data flows from edge gateways to cloud analytics
Financial trading Ultra-low latency processing with transaction context for complex event detection
AI-driven analytics Real-time model inference with context-aware feature extraction from streaming data
Deployment best practices
Design for fault tolerance Implement dead-letter queues and idempotent processors for exactly-once semantics
Buffer sizes and batching Configure appropriate buffer depths and use batching to maximize throughput
Monitor backpressure Track consumer lag and implement backpressure signals to protect downstream systems
Encrypt sensitive streams Use TLS and apply field-level encryption for PII or financial data in flight