Working with Streams
Developer can choose piping data from LLM gradually, using Embabel streaming capabilities.
In addition to streaming the raw text output from the LLM, Embabel streams can also include LLM reasoning events, so-called "thinking", and stream of objects created by the LLM. This feature is well aligned with Embabel focus on object-oriented programming model.
Concepts
StreamingEvent- wraps Thinking or user ObjectStreamingPromptRunnerBuilder- runner with streaming capabilities- Spring Reactive Programming Support for Spring AI ChatClient as underlying infrastructure
- All reactive callbacks, such as doOnNext, doOnComplete, etc. are at developer’s disposal
Example - Simple Thinking and Object Streaming with Callbacks
PromptRunner runner = ai.withLlm("qwen3:latest")
.withToolObject(Tooling.class);
String prompt = "What are exactly two the most hottest months in Florida and their respective highest temperatures";
// Use StreamingPromptBuilder instead of Kotlin extension function
Flux<StreamingEvent<MonthItem>> results = new StreamingPromptRunnerBuilder(runner)
.streaming()
.withPrompt(prompt)
.createObjectStreamWithThinking(MonthItem.class);
// Subscribe with real reactive callbacks using builder pattern
results
.timeout(Duration.ofSeconds(150))
.doOnSubscribe(subscription -> {
logger.info("Stream subscription started");
})
.doOnNext(event -> {
if (event.isThinking()) {
String content = event.getThinking();
receivedEvents.add("THINKING: " + content);
logger.info("Integration test received thinking: {}", content);
} else if (event.isObject()) {
MonthItem obj = event.getObject();
receivedEvents.add("OBJECT: " + obj.getName());
logger.info("Integration test received object: {}", obj.getName());
}
})
.doOnError(error -> {
errorOccurred.set(error);
logger.error("Integration test stream error: {}", error.getMessage());
})
.doOnComplete(() -> {
completionCalled.set(true);
logger.info("Integration test stream completed successfully");
})
.blockLast(Duration.ofSeconds(6000));
Example - Simple Raw Text Streaming with Callbacks
PromptRunner runner = ai.withLlm("qwen3:latest");
String prompt = "What is the highest building in Paris?";
// Use StreamingPromptBuilder instead of Kotlin extension function
Flux<String> results = new StreamingPromptRunnerBuilder(runner)
.streaming()
.withPrompt(prompt)
.generateStream();
// Subscribe with real reactive callbacks using builder pattern
results
.timeout(Duration.ofSeconds(150))
.doOnSubscribe(subscription -> {
logger.info("Stream subscription started");
})
.doOnNext(content -> {
receivedTextChunks.add(content);
logger.info("Integration test received text chunk: {}", content);
})
.doOnError(error -> {
errorOccurred.set(error);
logger.error("Integration test stream error: {}", error.getMessage());
})
.doOnComplete(() -> {
completionCalled.set(true);
logger.info("Integration test stream completed successfully");
})
.blockLast(Duration.ofSeconds(6000));




