Working with Streams

Developer can choose piping data from LLM gradually, using Embabel streaming capabilities.

In addition to streaming the raw text output from the LLM, Embabel streams can also include LLM reasoning events, so-called "thinking", and stream of objects created by the LLM. This feature is well aligned with Embabel focus on object-oriented programming model.

Concepts

  • StreamingEvent - wraps Thinking or user Object
  • StreamingPromptRunnerBuilder - runner with streaming capabilities
  • Spring Reactive Programming Support for Spring AI ChatClient as underlying infrastructure
  • All reactive callbacks, such as doOnNext, doOnComplete, etc. are at developer’s disposal

Example - Simple Thinking and Object Streaming with Callbacks


        PromptRunner runner = ai.withLlm("qwen3:latest")
                                .withToolObject(Tooling.class);

        String prompt = "What are exactly two the most hottest months in Florida and their respective highest temperatures";

        // Use StreamingPromptBuilder instead of Kotlin extension function
        Flux<StreamingEvent<MonthItem>> results = new StreamingPromptRunnerBuilder(runner)
                .streaming()
                .withPrompt(prompt)
                .createObjectStreamWithThinking(MonthItem.class);

        // Subscribe with real reactive callbacks using builder pattern
        results
                .timeout(Duration.ofSeconds(150))
                .doOnSubscribe(subscription -> {
                    logger.info("Stream subscription started");
                })
                .doOnNext(event -> {
                    if (event.isThinking()) {
                        String content = event.getThinking();
                        receivedEvents.add("THINKING: " + content);
                        logger.info("Integration test received thinking: {}", content);
                    } else if (event.isObject()) {
                        MonthItem obj = event.getObject();
                        receivedEvents.add("OBJECT: " + obj.getName());
                        logger.info("Integration test received object: {}", obj.getName());
                    }
                })
                .doOnError(error -> {
                    errorOccurred.set(error);
                    logger.error("Integration test stream error: {}", error.getMessage());
                })
                .doOnComplete(() -> {
                    completionCalled.set(true);
                    logger.info("Integration test stream completed successfully");
                })
                .blockLast(Duration.ofSeconds(6000));

Example - Simple Raw Text Streaming with Callbacks


        PromptRunner runner = ai.withLlm("qwen3:latest");

        String prompt = "What is the highest building in Paris?";

        // Use StreamingPromptBuilder instead of Kotlin extension function
        Flux<String> results = new StreamingPromptRunnerBuilder(runner)
                .streaming()
                .withPrompt(prompt)
                .generateStream();

        // Subscribe with real reactive callbacks using builder pattern
        results
                .timeout(Duration.ofSeconds(150))
                .doOnSubscribe(subscription -> {
                    logger.info("Stream subscription started");
                })
                .doOnNext(content -> {
                        receivedTextChunks.add(content);
                        logger.info("Integration test received text chunk: {}", content);
                })
                .doOnError(error -> {
                    errorOccurred.set(error);
                    logger.error("Integration test stream error: {}", error.getMessage());
                })
                .doOnComplete(() -> {
                    completionCalled.set(true);
                    logger.info("Integration test stream completed successfully");
                })
                .blockLast(Duration.ofSeconds(6000));

Was this page helpful?

Share