top | item 39920986

(no title)

zainhoda | 1 year ago

Looks really interesting! I saw something in the code about streaming. Could you explain that a bit more?

discuss

order

elijahbenizzy|1 year ago

Yep! There's a streaming API. It's a little more technically involved, but you can stream back at the point at which an application "halts", meaning you pause and give control back to the user. It only updates state after the stream completes.

Technical details follow:

You can define an action like this:

    @streaming_action(reads=["prompt"], writes=["prompt"])
    def streaming_chat_call(state: State, **run_kwargs) -> Generator[dict, None, Tuple[dict, State]]:
        client = openai.Client()
        response = client.chat.completions.create(
            ...,
            stream=True,
        )
        buffer = []
        for chunk in response:
            delta = chunk.choices[0].delta.content
            buffer.append(delta)
            yield {'response': delta}
        full_response = ''.join(buffer)
        return {'response': full_response}, state.append(response=full_response)
Then you would call `application.stream_result()` function, which would give you back a container object that you can stream to the user:

    streaming_result_container = application.stream_result(...)
    action_we_just_ran = streaming_result_container.get()
    print(f"getting streaming results for action={action_we_just_ran.name}")

    for result_component in streaming_result_container:
        print(result_component['response']) # this assumes you have a response key in your result

    # get the final result
    final_state, final_result = streaming_result_container.get()
Its nice in a web-server or a streamlit app where you can use streaming responses to connect to the frontend. Here's how we use it in a streamlit app -- we plan for a streaming web server soon: https://github.com/DAGWorks-Inc/burr/blob/main/examples/stre....