I'm using the Textual framework for a simple TUI.
The task is to display the results of ChatGPT-prompts as they are streamed.
Problem is: I cannot figure out, how to update the app, so that it shows the streamed results.
Here is a minimal example. I expect the Counter
label to display the numbers from 0 to 9 very quickly. The result is, that I only get the number 9 after waiting for a second.
import time
from textual.app import App
from textual.widgets import Header, Label
from textual.reactive import reactive
def randomgen():
for i in range(10):
time.sleep(0.1)
yield str(i)
class Counter(Label):
countervalue = reactive("Press Enter to start")
def watch_countervalue(self, countervalue):
self.update(countervalue)
class Minimal(App):
def compose(self):
yield Header()
yield Counter(id="counter")
def on_key(self, event):
if event.key == "enter":
for i in randomgen():
self.query_one("#counter").countervalue = i # pyright: ignore
if __name__ == "__main__":
app = Minimal()
app.run()
You are updating countervalue
correctly, but there are other issues with your code that are preventing it from working.
The first is that you are using time.sleep
which is a blocking call. Blocking calls will prevent asyncio doing anything else. You can replace time.sleep
with await asyncio.sleep
.
import asyncio
from textual.app import App
from textual.widgets import Header, Label
from textual.reactive import reactive
def randomgen():
for i in range(10):
yield str(i)
class Counter(Label):
countervalue = reactive("Press Enter to start")
def watch_countervalue(self, countervalue):
self.update(countervalue)
class Minimal(App):
def compose(self):
yield Header()
yield Counter(id="counter")
async def on_key(self, event):
if event.key == "enter":
for i in randomgen():
self.query_one("#counter").countervalue = i # pyright: ignore
await asyncio.sleep(0.1)
if __name__ == "__main__":
app = Minimal()
app.run()
This works in the way I think you intended it to. Although you should avoid doing work that takes a while in message handlers, as your app won't be able to process messages until it completes.
It is better to have long running tasks in a worker. Here's an example:
import asyncio
from textual.app import App
from textual import work
from textual.widgets import Header, Label
from textual.reactive import reactive
class Counter(Label):
countervalue = reactive("Press Enter to start")
def watch_countervalue(self, countervalue):
self.update(countervalue)
class Minimal(App):
def compose(self):
yield Header()
yield Counter(id="counter")
@work
async def run_counter(self) -> None:
for i in range(10):
self.query_one("#counter", Counter).countervalue = str(i)
await asyncio.sleep(0.1)
async def on_key(self, event):
if event.key == "enter":
self.run_counter()
if __name__ == "__main__":
app = Minimal()
app.run()
You could also implement this with set_interval
.