pythonbokeh

Bokeh server: plot shared data in several sessions


I am relatively new to bokeh, but very familiar with python.
I want to use a Bokeh server to create a graphical live viewer for my data. I want to retrieve the data in a single thread and then pass them to all open documents / sessions.
What would be a good solution to do so? Is there even a good solution?

What I tried:

My current approach was to start the thread as soon as the server is started, and inside that iterate over all sessions to update the plots, as soon as new data are available. It is working for one session, but as soon as I open another session, I start getting errors (EDIT: see traceback below). Also, after loading the page there are only the initial data visible until new data arrive.

# app_hooks.py
import time
import threading

import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource

keep_running = True
data = {  # some initial data
    'x': [1, 2, 3, 4, 5],
    'y': [6, 7, 2, 4, 7]
}

def callback(document: Document, new_data: dict):
    model = document.get_model_by_name("line")
    if model is None:
        print("Model not found in document")
        return
    assert isinstance(model, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
    source: ColumnDataSource = model.data_source
    # source.stream(new_data, rollover=100)
    source.data = new_data

def retrieve_data(i: int) -> dict[str, list[float]]:
    # idea: use a PUB/SUB pattern to retrieve data
    global data
    time.sleep(1)  # it might take some time until new data is available. Can be seconds to hours.
    data['x'].append(i)
    data['y'].append(i * 0.5 % 5)
    return data

def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
    # If present, this function executes when the server starts.
    def add_data_worker():
        global keep_running
        i=0
        while keep_running:
            print(f"Iteration {i + 1}")
            new_data = retrieve_data(i)
            for session in server_context.sessions:
                print(session.destroyed, session.expiration_requested)
                session.document.add_next_tick_callback(lambda: callback(session.document, new_data))
            i += 1

    thread = threading.Thread(target=add_data_worker, daemon=True)
    thread.start()

def on_server_unloaded(server_context: bokeh.server.contexts.BokehServerContext):
    # If present, this function executes when the server shuts down.
    global keep_running
    keep_running = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from .app_hooks import data

source = ColumnDataSource(data=data)

p = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
p.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")

curdoc().add_root(p)
Error message:
2025-06-23 08:43:14,863 Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x0000029D6617B1F0>>, <Task finished name='Task-204' coro=<Server
Session.with_document_locked() done, defined at C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py:77> exception=RuntimeError('_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes')>)
Traceback (most recent call last):
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\tornado\ioloop.py", line 740, in _run_callback
    ret = callback()
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\tornado\ioloop.py", line 764, in _discard_future_result
    future.result()
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 94, in _needs_document_lock_wrapper
    result = func(self, *args, **kwargs)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 226, in with_document_locked
    return func(*args, **kwargs)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 495, in wrapper
    return invoke_with_curdoc(doc, invoke)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 453, in invoke_with_curdoc
    return f()
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 494, in invoke
    return f(*args, **kwargs)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 184, in remove_then_invoke
    return callback()
  File "C:\XtProgramFiles\python_libs_v3\dev\bokeh_tests\app_hooks.py", line 42, in <lambda>
    session.document.add_next_tick_callback(lambda: callback(session.document, new_data))
  File "C:\XtProgramFiles\python_libs_v3\dev\bokeh_tests\app_hooks.py", line 22, in callback
    source.data = new_data
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\has_props.py", line 336, in __setattr__
    return super().__setattr__(name, value)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 761, in __set__
    self._set(obj, old, value, hint=hint, setter=setter)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 614, in _set
    self._trigger(obj, old, value, hint=hint, setter=setter)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\core\property\descriptors.py", line 692, in _trigger
    obj.trigger(self.name, old, value, hint, setter)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\model\model.py", line 583, in trigger
    super().trigger(descriptor.name, old, new, hint=hint, setter=setter)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\util\callback_manager.py", line 177, in trigger
    self.document.callbacks.notify_change(cast(Model, self), attr, old, new, hint, setter, invoke)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 251, in notify_change
    self.trigger_on_change(event)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 423, in trigger_on_change
    invoke_with_curdoc(doc, invoke_callbacks)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 453, in invoke_with_curdoc
    return f()
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 422, in invoke_callbacks
    cb(event)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\callbacks.py", line 278, in <lambda>
    self._change_callbacks[receiver] = lambda event: event.dispatch(receiver)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\events.py", line 426, in dispatch
    super().dispatch(receiver)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\document\events.py", line 218, in dispatch
    cast(DocumentPatchedMixin, receiver)._document_patched(self)
  File "C:\XtProgramFiles\Miniconda3\envs\dev3\lib\site-packages\bokeh\server\session.py", line 244, in _document_patched
    raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

Other ideas I had:

Are there other ideas? Are there best practices for similar problems? Do I even choose a good library for that?


Solution

  • I think I found solutions for my problems

    Error messages

    Instead of using a lambda function inside add_next_tick_callback, I used partial() as also suggested in the documentation

    from functools import partial
    ...
    session.document.add_next_tick_callback(partial(callback, session.document, new_data))
    

    Visibility of initial data

    This part is a bit of a hack, as I couldn't find an intended way to share the state between sessions.
    The idea is to add the shared variable to the server context and retrieve it when a new document is created.

    # app_hooks.py
    
    data = {
        'x': [1, 2, 3, 4, 5],
        'y': [6, 7, 2, 4, 7]
    }
    
    ...
    
    def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
        # If present, this function executes when the server starts.
        global data
        server_context.global_data = {"data": data}
        ...
    
    # main.py
    
    from bokeh.plotting import figure, curdoc
    
    server_context = curdoc().session_context.server_context
    if hasattr(server_context, "global_data"):
        global_data: dict = server_context.global_data
    else:
        raise RuntimeError("global_data not found in server context")
    
    data = global_data["data"]
    ...
    

    HINT: This solution might be problematic if the bokeh server is working with more than one process (option --num-procs), but as I am working on windows and currently (v3.7.3) Bokeh does not support more than one process on windows, this is not a problem for me.

    Overall Code

    Together with some other fixes, my overall code looks like this :

    # app_hooks.py
    import time
    import threading
    from functools import partial
    
    import bokeh.server.contexts
    from bokeh.plotting import Document
    from bokeh.models import ColumnDataSource
    
    keep_running = True
    data = {
        'x': [1, 2, 3, 4, 5],
        'y': [6, 7, 2, 4, 7]
    }
    
    def callback(document: Document, new_data: dict):
        model = document.get_model_by_name("line")
        if model is None:
            print("Model not found in document")
            return
        assert isinstance(model, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
        source: ColumnDataSource = model.data_source
        # source.stream(new_data, rollover=100)
        source.data = new_data
    
    def retrieve_data(i: int) -> dict[str, list[float]]:
        # idea: use a PUB/SUB pattern to retrieve data
        global data
        time.sleep(1)  # it might take some time until new data is available. Can be seconds to hours.
        data['x'].append(i)
        data['y'].append(i * 0.5 % 5)
        return data
    
    def on_server_loaded(server_context: bokeh.server.contexts.BokehServerContext):
        # If present, this function executes when the server starts.
        global data
        print("data id global", id(data))
        server_context.global_data = {"data": data}
    
        def add_data_worker():
            global keep_running
            i=0
            while keep_running:
                print(f"Iteration {i + 1}, doc count: {len(server_context.sessions)}")
                new_data = retrieve_data(i)
                for session in server_context.sessions:
                    if session.destroyed or session.expiration_requested:
                        continue  # Skip destroyed or expired sessions
                    try:
                        # even if we checked for destroyed or expired sessions, session.document might not be available,
                        #  if destroyed in the meantime
                        doc = session.document
                    except AttributeError:
                        continue
                    else:
                        doc.add_next_tick_callback(partial(callback, doc, new_data))
                i += 1
    
        thread = threading.Thread(target=add_data_worker, daemon=True)
        thread.start()
    
    def on_server_unloaded(server_context: bokeh.server.contexts.BokehServerContext):
        # If present, this function executes when the server shuts down.
        global keep_running
        keep_running = False
    
    # main.py
    
    from bokeh.plotting import figure, curdoc
    from bokeh.models import ColumnDataSource
    
    server_context = curdoc().session_context.server_context
    if hasattr(server_context, "global_data"):
        global_data: dict = server_context.global_data
    else:
        raise RuntimeError("global_data not found in server context")
    
    data = global_data["data"]
    print("data id", id(data))
    source = ColumnDataSource(data=data)
    
    p = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
    p.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")
    
    curdoc().add_root(p)