pythonfilterpython-asynciogeneratorpython-typing

async filter function with generic typing


I'm trying to implement the python built in filter but async, seem like an easy task right?

async def simulated_data() -> AsyncIterator[int|None]:
    for i in [1, None,3,5]:
        yield i

async def afilter[T](predicate, iterable):
    async for item in iterable:
        if predicate is not none and predicate(item):
            yield item

b = afilter(None, simulated_data())
# or just this!
b = (it async for it in iter if iter is not None) 

Even a comprehension does the trick :D

But what about typing? The type of b still shows "AsyncGenerator[int | None, None]" but it can´t be None.

I tried with TypeGuard, but no luck, then I went to the original filter function, because this problem is solved already there.

class filter(Generic[_T]):
    @overload
    def __new__(cls, function: None, iterable: Iterable[_T | None], /) -> Self: ...
    @overload
    def __new__(cls, function: Callable[[_S], TypeGuard[_T]], iterable: Iterable[_S], /) -> Self: ...
    @overload
    def __new__(cls, function: Callable[[_S], TypeIs[_T]], iterable: Iterable[_S], /) -> Self: ...
    @overload
    def __new__(cls, function: Callable[[_T], Any], iterable: Iterable[_T], /) -> Self: ...
    def __iter__(self) -> Self: ...
    def __next__(self) -> _T: ...

Well it seems filter is not even a function is a generic class, at this point the task doesn't look so easy, anyone has the solution (with generic types) by any chance?


Solution

  • A minimal example of an async filter can be defined with two overloads:

    from typing import AsyncIterator, AsyncIterable, Callable, overload, AsyncGenerator
    
    async def simulated_data() -> AsyncIterator[int|None]:
        for i in [1, None,3,5]:
            yield i
    
    @overload
    async def afilter[T](predicate: None, iterable: AsyncIterable[T | None]) -> AsyncGenerator[T]: ...
    
    @overload
    async def afilter[T](predicate: Callable[[T], bool], iterable: AsyncIterable[T]) -> AsyncGenerator[T]: ...
    
    async def afilter[T](predicate: Callable[[T], bool] | None, iterable: AsyncIterable[T])  -> AsyncGenerator[T]:
        async for item in iterable:
            if predicate is None:
                if item:
                    yield item
            elif predicate(item):
                yield item
    
    # No predicate
    only_int = afilter(None, simulated_data())
    reveal_type(only_int)  # AsyncGenerator[int, None]
    
    # Some predicate
    both = afilter(lambda data: False, simulated_data())
    reveal_type(both)  # AsyncGenerator[int | None, None] 
    
    # Comprehension
    aiter = simulated_data()
    comprehension = (it async for it in aiter if it is not None)
    reveal_type(comprehension)  # AsyncGenerator[int, None] 
    

    You will realize that when using a predicate there is no further narrowing, it will be just type T. If you want to narrow down types further you need more overloads for predicate similar to the filter function:

    @overload
    async def afilter[T, S](predicate: Callable[[S], TypeIs[T], iterable: AsyncIterable[S]) -> AsyncGenerator[T]: ...
    
    @overload
    async def afilter[T, S](predicate: Callable[[S], TypeGuard[T], iterable: AsyncIterable[S]) -> AsyncGenerator[T]: ...
    

    With these two you can, for example, define a custom predicate:

    def remove_none[T](value: T | None) -> TypeGuard[T]:
       return value is not None
    
    without_none = afilter(remove_none, simulated_data())
    # AsyncGenerator[int, None]