I am relatively new to bokeh, but very familiar with python.
I want to use a Bokeh server to create a graphical live viewer for my data. I want to retrieve the data in a single thread and then pass them to all open documents / sessions.
What would be a good solution to do so? Is there even a good solution?
My current approach was to start the thread as soon as the server is started, and inside that iterate over all sessions to update the plots, as soon as new data are available. It is working for one session, but as soon as I open another session, I start getting errors (EDIT: see traceback below). Also, after loading the page there are only the initial data visible until new data arrive.
# app_hooks.py
import time
import threading
import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource
keep_running = True
data = { # some initial data
'x': [1, 2, 3, 4, 5],
'y': [6, 7, 2, 4, 7]
}
def callback(document: Document, new_data: dict):
model = document.get_model_by_name("line")
if model is None:
print("Model not found in document")
return
assert isinstance(model, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
source: ColumnDataSource = model.data_source
# source.stream(new_data, rollover=100)
source.data = new_data
def retrieve_data(i: int) -> dict[str, list[float]]:
# idea: use a PUB/SUB pattern to retrieve data
global data
time.sleep(1) # it might take some time until new data is available. Can be seconds to hours.
data['x'].append(i)
data['y'].append(i * 0.5 % 5)
return data
def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
# If present, this function executes when the server starts.
def add_data_worker():
global keep_running
i=0
while keep_running:
print(f"Iteration {i + 1}")
new_data = retrieve_data(i)
for session in server_context.sessions:
print(session.destroyed, session.expiration_requested)
session.document.add_next_tick_callback(lambda: callback(session.document, new_data))
i += 1
thread = threading.Thread(target=add_data_worker, daemon=True)
thread.start()
def on_server_unloaded(server_context: bokeh.server.contexts.BokehServerContext):
# If present, this function executes when the server shuts down.
global keep_running
keep_running = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from .app_hooks import data
source = ColumnDataSource(data=data)
p = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
p.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")
curdoc().add_root(p)
2025-06-23 08:43:14,863 Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x0000029D6617B1F0>>, <Task finished name='Task-204' coro=<Server
Session.with_document_locked() done, defined at C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py:77> exception=RuntimeError('_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes')>)
Traceback (most recent call last):
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\tornado\ioloop.py", line 740, in _run_callback
ret = callback()
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\tornado\ioloop.py", line 764, in _discard_future_result
future.result()
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 94, in _needs_document_lock_wrapper
result = func(self, *args, **kwargs)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 226, in with_document_locked
return func(*args, **kwargs)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 495, in wrapper
return invoke_with_curdoc(doc, invoke)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 453, in invoke_with_curdoc
return f()
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 494, in invoke
return f(*args, **kwargs)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 184, in remove_then_invoke
return callback()
File "C:\XtProgramFiles\python_libs_v3\dev\bokeh_tests\app_hooks.py", line 42, in <lambda>
session.document.add_next_tick_callback(lambda: callback(session.document, new_data))
File "C:\XtProgramFiles\python_libs_v3\dev\bokeh_tests\app_hooks.py", line 22, in callback
source.data = new_data
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\has_props.py", line 336, in __setattr__
return super().__setattr__(name, value)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 761, in __set__
self._set(obj, old, value, hint=hint, setter=setter)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 614, in _set
self._trigger(obj, old, value, hint=hint, setter=setter)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 692, in _trigger
obj.trigger(self.name, old, value, hint, setter)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\model\model.py", line 583, in trigger
super().trigger(descriptor.name, old, new, hint=hint, setter=setter)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\util\callback_manager.py", line 177, in trigger
self.document.callbacks.notify_change(cast(Model, self), attr, old, new, hint, setter, invoke)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 251, in notify_change
self.trigger_on_change(event)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 423, in trigger_on_change
invoke_with_curdoc(doc, invoke_callbacks)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 453, in invoke_with_curdoc
return f()
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 422, in invoke_callbacks
cb(event)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 278, in <lambda>
self._change_callbacks[receiver] = lambda event: event.dispatch(receiver)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\events.py", line 426, in dispatch
super().dispatch(receiver)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\events.py", line 218, in dispatch
cast(DocumentPatchedMixin, receiver)._document_patched(self)
File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 244, in _document_patched
raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes
curdoc().add_periodic_callback(callback, 1000)
and inside that callback sending a HTTP-request which gives me the current data. But I want to avoid polling and I want to minimize the delay when new data are available. So I would have to poll very often.Are there other ideas? Are there best practices for similar problems? Do I even choose a good library for that?
I think I found solutions for my problems
Instead of using a lambda function inside add_next_tick_callback
, I used partial()
as also suggested in the documentation
from functools import partial
...
session.document.add_next_tick_callback(partial(callback, session.document, new_data))
This part is a bit of a hack, as I couldn't find an intended way to share the state between sessions.
The idea is to add the shared variable to the server context and retrieve it when a new document is created.
# app_hooks.py
data = {
'x': [1, 2, 3, 4, 5],
'y': [6, 7, 2, 4, 7]
}
...
def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
# If present, this function executes when the server starts.
global data
server_context.global_data = {"data": data}
...
# main.py
from bokeh.plotting import figure, curdoc
server_context = curdoc().session_context.server_context
if hasattr(server_context, "global_data"):
global_data: dict = server_context.global_data
else:
raise RuntimeError("global_data not found in server context")
data = global_data["data"]
...
HINT: This solution might be problematic if the bokeh server is working with more than one process (option --num-procs
), but as I am working on windows and currently (v3.7.3) Bokeh does not support more than one process on windows, this is not a problem for me.
Together with some other fixes, my overall code looks like this :
# app_hooks.py
import time
import threading
from functools import partial
import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource
keep_running = True
data = {
'x': [1, 2, 3, 4, 5],
'y': [6, 7, 2, 4, 7]
}
def callback(document: Document, new_data: dict):
model = document.get_model_by_name("line")
if model is None:
print("Model not found in document")
return
assert isinstance(model, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
source: ColumnDataSource = model.data_source
# source.stream(new_data, rollover=100)
source.data = new_data
def retrieve_data(i: int) -> dict[str, list[float]]:
# idea: use a PUB/SUB pattern to retrieve data
global data
time.sleep(1) # it might take some time until new data is available. Can be seconds to hours.
data['x'].append(i)
data['y'].append(i * 0.5 % 5)
return data
def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
# If present, this function executes when the server starts.
global data
print("data id global", id(data))
server_context.global_data = {"data": data}
def add_data_worker():
global keep_running
i=0
while keep_running:
print(f"Iteration {i + 1}, doc count: {len(server_context.sessions)}")
new_data = retrieve_data(i)
for session in server_context.sessions:
if session.destroyed or session.expiration_requested:
continue # Skip destroyed or expired sessions
try:
# even if we checked for destroyed or expired sessions, session.document might not be available,
# if destroyed in the meantime
doc = session.document
except AttributeError:
continue
else:
doc.add_next_tick_callback(partial(callback, doc, new_data))
i += 1
thread = threading.Thread(target=add_data_worker, daemon=True)
thread.start()
def on_server_unloaded(server_context: bokeh.server.contexts.BokehServerContext):
# If present, this function executes when the server shuts down.
global keep_running
keep_running = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
server_context = curdoc().session_context.server_context
if hasattr(server_context, "global_data"):
global_data: dict = server_context.global_data
else:
raise RuntimeError("global_data not found in server context")
data = global_data["data"]
print("data id", id(data))
source = ColumnDataSource(data=data)
p = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
p.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")
curdoc().add_root(p)