diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index acf71420e1858d41a0760c27a781d1f1bb5d0b0e..9a61bc1e8e82e4320e79ba49336d4d99b42aa726 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,11 +20,8 @@ jobs:
       fail-fast: false
       matrix:
         os: [ubuntu-latest, macos-latest, windows-latest]
-        python-version: [3.6, 3.7, 3.8, 3.9, pypy3]
-        # XML coverage reporting is broken for this particular configuration
-        exclude:
-          - os: windows-latest
-            python-version: pypy3
+        python-version: ["3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
+
     env:
       OS: ${{ matrix.os }}
       PYTHON: ${{ matrix.python-version }}
@@ -34,6 +31,8 @@ jobs:
         uses: actions/setup-python@v2
         with:
           python-version: ${{ matrix.python-version }}
+      - name: Install test requirements
+        run: pip install -r test-requirements.txt
       - name: Run tests
         run: python setup.py test --addopts "--cov-report xml"
       - name: Upload coverage
@@ -53,6 +52,10 @@ jobs:
           python-version: 3.8
       - name: Build source distribution
         run: python setup.py sdist
+      - name: Build wheel
+        run: |
+          pip install wheel
+          python setup.py bdist_wheel
       - name: Publish source package on PyPI
         uses: pypa/gh-action-pypi-publish@master
         with:
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 1e454cc6d6faab49122f500fdc02f630d5920b60..5c4b5132063f085b8263e023ec3ba5762902d492 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -6,11 +6,23 @@ repos:
     -   id: end-of-file-fixer
     -   id: trailing-whitespace
 -   repo: https://github.com/ambv/black
-    rev: 20.8b1
+    rev: 23.3.0
     hooks:
     - id: black
       language_version: python3
--   repo: https://gitlab.com/pycqa/flake8
-    rev: 3.8.3
+-   repo: https://github.com/pycqa/flake8
+    rev: 5.0.4
     hooks:
     - id: flake8
+-   repo: https://github.com/pre-commit/mirrors-mypy
+    rev: v1.6.1
+    hooks:
+    -   id: mypy
+        files: ^(?!tests)
+        types: [python]
+- repo: https://github.com/astral-sh/ruff-pre-commit
+  # Ruff version.
+  rev: v0.0.272
+  hooks:
+    - id: ruff
+      args: [ --fix, --exit-non-zero-on-fix, --ignore, "E501,F403" ]
diff --git a/.readthedocs.yaml b/.readthedocs.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..a0ffd31d693cf3e50fdc1421c99b169902c83755
--- /dev/null
+++ b/.readthedocs.yaml
@@ -0,0 +1,14 @@
+version: 2
+
+build:
+  os: ubuntu-22.04
+  tools:
+    python: "3.11"
+
+python:
+  install:
+    - method: pip
+      path: .
+
+sphinx:
+  configuration: docs/conf.py
diff --git a/.readthedocs.yml b/.readthedocs.yml
deleted file mode 100644
index 19fd9dd704d841e1a7554b149dc2778ad81d2e9c..0000000000000000000000000000000000000000
--- a/.readthedocs.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-build:
-    image: latest
-
-python:
-    version: 3.6
-    setup_py_install: true
diff --git a/aiostream/__init__.py b/aiostream/__init__.py
index aab8640c5888b209a28a305deda4cd3ac23305c9..3ac80c16bdbddeaf5ef26af175cb8990ec2597db 100644
--- a/aiostream/__init__.py
+++ b/aiostream/__init__.py
@@ -17,7 +17,7 @@ Some utility modules are also provided:
 
 from . import stream, pipe
 from .aiter_utils import async_, await_
-from .core import StreamEmpty, operator, streamcontext
+from .core import StreamEmpty, operator, pipable_operator, streamcontext
 
 __all__ = [
     "stream",
@@ -25,6 +25,7 @@ __all__ = [
     "async_",
     "await_",
     "operator",
+    "pipable_operator",
     "streamcontext",
     "StreamEmpty",
 ]
diff --git a/aiostream/aiter_utils.py b/aiostream/aiter_utils.py
index a833259863df26a30bc94c592110f0581e9abdfc..f68ea846def63cbc1dff019de2dfe98e03ba835a 100644
--- a/aiostream/aiter_utils.py
+++ b/aiostream/aiter_utils.py
@@ -1,13 +1,28 @@
 """Utilities for asynchronous iteration."""
+from __future__ import annotations
+from types import TracebackType
 
 import warnings
 import functools
-from collections.abc import AsyncIterator
-
-try:
-    from contextlib import AsyncExitStack
-except ImportError:  # pragma: no cover
-    from async_exit_stack import AsyncExitStack
+from typing import (
+    TYPE_CHECKING,
+    AsyncContextManager,
+    AsyncGenerator,
+    AsyncIterable,
+    Awaitable,
+    Callable,
+    Type,
+    TypeVar,
+    AsyncIterator,
+    Any,
+)
+
+if TYPE_CHECKING:
+    from typing_extensions import ParamSpec
+
+    P = ParamSpec("P")
+
+from contextlib import AsyncExitStack
 
 __all__ = [
     "aiter",
@@ -27,13 +42,13 @@ __all__ = [
 # Magic method shorcuts
 
 
-def aiter(obj):
+def aiter(obj: AsyncIterable[T]) -> AsyncIterator[T]:
     """Access aiter magic method."""
     assert_async_iterable(obj)
     return obj.__aiter__()
 
 
-def anext(obj):
+def anext(obj: AsyncIterator[T]) -> Awaitable[T]:
     """Access anext magic method."""
     assert_async_iterator(obj)
     return obj.__anext__()
@@ -42,16 +57,16 @@ def anext(obj):
 # Async / await helper functions
 
 
-async def await_(obj):
+async def await_(obj: Awaitable[T]) -> T:
     """Identity coroutine function."""
     return await obj
 
 
-def async_(fn):
+def async_(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
     """Wrap the given function into a coroutine function."""
 
     @functools.wraps(fn)
-    async def wrapper(*args, **kwargs):
+    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
         return await fn(*args, **kwargs)
 
     return wrapper
@@ -60,12 +75,12 @@ def async_(fn):
 # Iterability helpers
 
 
-def is_async_iterable(obj):
+def is_async_iterable(obj: object) -> bool:
     """Check if the given object is an asynchronous iterable."""
     return hasattr(obj, "__aiter__")
 
 
-def assert_async_iterable(obj):
+def assert_async_iterable(obj: object) -> None:
     """Raise a TypeError if the given object is not an
     asynchronous iterable.
     """
@@ -73,12 +88,12 @@ def assert_async_iterable(obj):
         raise TypeError(f"{type(obj).__name__!r} object is not async iterable")
 
 
-def is_async_iterator(obj):
+def is_async_iterator(obj: object) -> bool:
     """Check if the given object is an asynchronous iterator."""
     return hasattr(obj, "__anext__")
 
 
-def assert_async_iterator(obj):
+def assert_async_iterator(obj: object) -> None:
     """Raise a TypeError if the given object is not an
     asynchronous iterator.
     """
@@ -88,8 +103,11 @@ def assert_async_iterator(obj):
 
 # Async iterator context
 
+T = TypeVar("T")
+Self = TypeVar("Self", bound="AsyncIteratorContext[Any]")
+
 
-class AsyncIteratorContext(AsyncIterator):
+class AsyncIteratorContext(AsyncIterator[T], AsyncContextManager[Any]):
     """Asynchronous iterator with context management.
 
     The context management makes sure the aclose asynchronous method
@@ -111,7 +129,7 @@ class AsyncIteratorContext(AsyncIterator):
     _RUNNING = "RUNNING"
     _FINISHED = "FINISHED"
 
-    def __init__(self, aiterator):
+    def __init__(self, aiterator: AsyncIterator[T]):
         """Initialize with an asynchrnous iterator."""
         assert_async_iterator(aiterator)
         if isinstance(aiterator, AsyncIteratorContext):
@@ -119,10 +137,10 @@ class AsyncIteratorContext(AsyncIterator):
         self._state = self._STANDBY
         self._aiterator = aiterator
 
-    def __aiter__(self):
+    def __aiter__(self: Self) -> Self:
         return self
 
-    def __anext__(self):
+    def __anext__(self) -> Awaitable[T]:
         if self._state == self._FINISHED:
             raise RuntimeError(
                 f"{type(self).__name__} is closed and cannot be iterated"
@@ -134,7 +152,7 @@ class AsyncIteratorContext(AsyncIterator):
             )
         return anext(self._aiterator)
 
-    async def __aenter__(self):
+    async def __aenter__(self: Self) -> Self:
         if self._state == self._RUNNING:
             raise RuntimeError(f"{type(self).__name__} has already been entered")
         if self._state == self._FINISHED:
@@ -144,12 +162,16 @@ class AsyncIteratorContext(AsyncIterator):
         self._state = self._RUNNING
         return self
 
-    async def __aexit__(self, typ, value, traceback):
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> bool:
         try:
             if self._state == self._FINISHED:
                 return False
             try:
-
                 # No exception to throw
                 if typ is None:
                     return False
@@ -172,6 +194,7 @@ class AsyncIteratorContext(AsyncIterator):
 
                 # Throw
                 try:
+                    assert isinstance(self._aiterator, AsyncGenerator)
                     await self._aiterator.athrow(typ, value, traceback)
                     raise RuntimeError("Async iterator didn't stop after athrow()")
 
@@ -203,16 +226,20 @@ class AsyncIteratorContext(AsyncIterator):
         finally:
             self._state = self._FINISHED
 
-    async def aclose(self):
+    async def aclose(self) -> None:
         await self.__aexit__(None, None, None)
 
-    async def athrow(self, exc):
+    async def athrow(self, exc: Exception) -> T:
         if self._state == self._FINISHED:
             raise RuntimeError(f"{type(self).__name__} is closed and cannot be used")
-        return await self._aiterator.athrow(exc)
+        assert isinstance(self._aiterator, AsyncGenerator)
+        item: T = await self._aiterator.athrow(exc)
+        return item
 
 
-def aitercontext(aiterable, *, cls=AsyncIteratorContext):
+def aitercontext(
+    aiterable: AsyncIterable[T],
+) -> AsyncIteratorContext[T]:
     """Return an asynchronous context manager from an asynchronous iterable.
 
     The context management makes sure the aclose asynchronous method
@@ -228,12 +255,8 @@ def aitercontext(aiterable, *, cls=AsyncIteratorContext):
         async with aitercontext(ait) as safe_ait:
             async for item in safe_ait:
                 <block>
-
-    An optional subclass of AsyncIteratorContext can be provided.
-    This class will be used to wrap the given iterable.
     """
-    assert issubclass(cls, AsyncIteratorContext)
     aiterator = aiter(aiterable)
-    if isinstance(aiterator, cls):
+    if isinstance(aiterator, AsyncIteratorContext):
         return aiterator
-    return cls(aiterator)
+    return AsyncIteratorContext(aiterator)
diff --git a/aiostream/core.py b/aiostream/core.py
index b2f33a1d903d1e963913d3ad1696b1a9af018f21..a11f08232b1dadd4c1ccecc444e90bcd5dff4bc7 100644
--- a/aiostream/core.py
+++ b/aiostream/core.py
@@ -1,11 +1,28 @@
 """Core objects for stream operators."""
+from __future__ import annotations
 
 import inspect
 import functools
-from collections.abc import AsyncIterable, Awaitable
+import sys
+import warnings
+
+from .aiter_utils import AsyncIteratorContext, aiter, assert_async_iterable
+from typing import (
+    Any,
+    AsyncIterator,
+    Callable,
+    Generator,
+    Iterator,
+    Protocol,
+    Union,
+    TypeVar,
+    cast,
+    AsyncIterable,
+    Awaitable,
+)
+
+from typing_extensions import ParamSpec, Concatenate
 
-from .aiter_utils import AsyncIteratorContext
-from .aiter_utils import aitercontext, assert_async_iterable
 
 __all__ = ["Stream", "Streamer", "StreamEmpty", "operator", "streamcontext"]
 
@@ -21,59 +38,49 @@ class StreamEmpty(Exception):
 
 # Helpers
 
+T = TypeVar("T")
+X = TypeVar("X")
+A = TypeVar("A", contravariant=True)
+P = ParamSpec("P")
+Q = ParamSpec("Q")
 
-async def wait_stream(aiterable):
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+    P = TypeVar("P")
+
+
+async def wait_stream(aiterable: BaseStream[T]) -> T:
     """Wait for an asynchronous iterable to finish and return the last item.
 
     The iterable is executed within a safe stream context.
     A StreamEmpty exception is raised if the sequence is empty.
     """
-    async with streamcontext(aiterable) as streamer:
-        async for item in streamer:
-            item
-        try:
-            return item
-        except NameError:
-            raise StreamEmpty()
-
-
-# Core objects
 
+    class Unassigned:
+        pass
 
-class Stream(AsyncIterable, Awaitable):
-    """Enhanced asynchronous iterable.
+    last_item: Unassigned | T = Unassigned()
 
-    It provides the following features:
+    async with streamcontext(aiterable) as streamer:
+        async for item in streamer:
+            last_item = item
 
-      - **Operator pipe-lining** - using pipe symbol ``|``
-      - **Repeatability** - every iteration creates a different iterator
-      - **Safe iteration context** - using ``async with`` and the ``stream``
-        method
-      - **Simplified execution** - get the last element from a stream using
-        ``await``
-      - **Slicing and indexing** - using square brackets ``[]``
-      - **Concatenation** - using addition symbol ``+``
+    if isinstance(last_item, Unassigned):
+        raise StreamEmpty()
+    return last_item
 
-    It is not meant to be instanciated directly.
-    Use the stream operators instead.
 
-    Example::
+# Core objects
 
-        xs = stream.count()    # xs is a stream object
-        ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
-        zs = ys[5:10:2]        # slice ys using start, stop and step
 
-        async with zs.stream() as streamer:  # stream zs in a safe context
-            async for z in streamer:         # iterate the zs streamer
-                print(z)                     # Prints 10, 12, 14
+class BaseStream(AsyncIterable[T], Awaitable[T]):
+    """
+    Base class for streams.
 
-        result = await zs  # await zs and return its last element
-        print(result)      # Prints 14
-        result = await zs  # zs can be used several times
-        print(result)      # Prints 14
+    See `Stream` and `Streamer` for more information.
     """
 
-    def __init__(self, factory):
+    def __init__(self, factory: Callable[[], AsyncIterable[T]]) -> None:
         """Initialize the stream with an asynchronous iterable factory.
 
         The factory is a callable and takes no argument.
@@ -83,7 +90,9 @@ class Stream(AsyncIterable, Awaitable):
         assert_async_iterable(aiter)
         self._generator = self._make_generator(aiter, factory)
 
-    def _make_generator(self, first, factory):
+    def _make_generator(
+        self, first: AsyncIterable[T], factory: Callable[[], AsyncIterable[T]]
+    ) -> Iterator[AsyncIterable[T]]:
         """Generate asynchronous iterables when required.
 
         The first iterable is created beforehand for extra checking.
@@ -93,28 +102,21 @@ class Stream(AsyncIterable, Awaitable):
         while True:
             yield factory()
 
-    def __aiter__(self):
-        """Asynchronous iteration protocol.
-
-        Return a streamer context for safe iteration.
-        """
-        return streamcontext(next(self._generator))
-
-    def __await__(self):
+    def __await__(self) -> Generator[Any, None, T]:
         """Await protocol.
 
         Safely iterate and return the last element.
         """
         return wait_stream(self).__await__()
 
-    def __or__(self, func):
+    def __or__(self, func: Callable[[BaseStream[T]], X]) -> X:
         """Pipe protocol.
 
         Allow to pipe stream operators.
         """
         return func(self)
 
-    def __add__(self, value):
+    def __add__(self, value: AsyncIterable[X]) -> Stream[Union[X, T]]:
         """Addition protocol.
 
         Concatenate with a given asynchronous sequence.
@@ -123,7 +125,7 @@ class Stream(AsyncIterable, Awaitable):
 
         return chain(self, value)
 
-    def __getitem__(self, value):
+    def __getitem__(self, value: Union[int, slice]) -> Stream[T]:
         """Get item protocol.
 
         Accept index or slice to extract the corresponding item(s)
@@ -135,9 +137,43 @@ class Stream(AsyncIterable, Awaitable):
     # Disable sync iteration
     # This is necessary because __getitem__ is defined
     # which is a valid fallback for for-loops in python
-    __iter__ = None
+    __iter__: None = None
+
+
+class Stream(BaseStream[T]):
+    """Enhanced asynchronous iterable.
+
+    It provides the following features:
+
+      - **Operator pipe-lining** - using pipe symbol ``|``
+      - **Repeatability** - every iteration creates a different iterator
+      - **Safe iteration context** - using ``async with`` and the ``stream``
+        method
+      - **Simplified execution** - get the last element from a stream using
+        ``await``
+      - **Slicing and indexing** - using square brackets ``[]``
+      - **Concatenation** - using addition symbol ``+``
 
-    def stream(self):
+    It is not meant to be instanciated directly.
+    Use the stream operators instead.
+
+    Example::
+
+        xs = stream.count()    # xs is a stream object
+        ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
+        zs = ys[5:10:2]        # slice ys using start, stop and step
+
+        async with zs.stream() as streamer:  # stream zs in a safe context
+            async for z in streamer:         # iterate the zs streamer
+                print(z)                     # Prints 10, 12, 14
+
+        result = await zs  # await zs and return its last element
+        print(result)      # Prints 14
+        result = await zs  # zs can be used several times
+        print(result)      # Prints 14
+    """
+
+    def stream(self) -> Streamer[T]:
         """Return a streamer context for safe iteration.
 
         Example::
@@ -150,11 +186,18 @@ class Stream(AsyncIterable, Awaitable):
         """
         return self.__aiter__()
 
+    def __aiter__(self) -> Streamer[T]:
+        """Asynchronous iteration protocol.
+
+        Return a streamer context for safe iteration.
+        """
+        return streamcontext(next(self._generator))
+
     # Advertise the proper synthax for entering a stream context
 
-    __aexit__ = None
+    __aexit__: None = None
 
-    async def __aenter__(self):
+    async def __aenter__(self) -> None:
         raise TypeError(
             "A stream object cannot be used as a context manager. "
             "Use the `stream` method instead: "
@@ -162,7 +205,7 @@ class Stream(AsyncIterable, Awaitable):
         )
 
 
-class Streamer(AsyncIteratorContext, Stream):
+class Streamer(AsyncIteratorContext[T], BaseStream[T]):
     """Enhanced asynchronous iterator context.
 
     It is similar to AsyncIteratorContext but provides the stream
@@ -181,7 +224,7 @@ class Streamer(AsyncIteratorContext, Stream):
     pass
 
 
-def streamcontext(aiterable):
+def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]:
     """Return a stream context manager from an asynchronous iterable.
 
     The context management makes sure the aclose asynchronous method
@@ -205,13 +248,47 @@ def streamcontext(aiterable):
             async for item in streamer:
                 <block>
     """
-    return aitercontext(aiterable, cls=Streamer)
+    aiterator = aiter(aiterable)
+    if isinstance(aiterator, Streamer):
+        return aiterator
+    return Streamer(aiterator)
+
+
+# Operator type protocol
+
+
+class OperatorType(Protocol[P, T]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
+        ...
+
+    def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
+        ...
+
+
+class PipableOperatorType(Protocol[A, P, T]):
+    def __call__(
+        self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+    ) -> Stream[T]:
+        ...
+
+    def raw(
+        self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+    ) -> AsyncIterator[T]:
+        ...
+
+    def pipe(
+        self, *args: P.args, **kwargs: P.kwargs
+    ) -> Callable[[AsyncIterable[A]], Stream[T]]:
+        ...
 
 
 # Operator decorator
 
 
-def operator(func=None, *, pipable=False):
+def operator(
+    func: Callable[P, AsyncIterator[T]] | None = None,
+    pipable: bool | None = None,
+) -> OperatorType[P, T]:
     """Create a stream operator from an asynchronous generator
     (or any function returning an asynchronous iterable).
 
@@ -222,16 +299,123 @@ def operator(func=None, *, pipable=False):
             while True:
                 yield offset + width * random.random()
 
-    Decorator usage for pipable operators::
+    The return value is a dynamically created class.
+    It has the same name, module and doc as the original function.
+
+    A new stream is created by simply instanciating the operator::
+
+        xs = random()
 
-        @operator(pipable=True)
+    The original function is called at instanciation to check that
+    signature match. Other methods are available:
+
+      - `original`: the original function as a static method
+      - `raw`: same as original but add extra checking
+
+    The `pipable` argument is deprecated, use `pipable_operator` instead.
+    """
+
+    # Handle compatibility with legacy (aiostream <= 0.4)
+    if pipable is not None or func is None:
+        warnings.warn(
+            "The `pipable` argument is deprecated. Use either `@operator` or `@pipable_operator` directly.",
+            DeprecationWarning,
+        )
+    if func is None:
+        return pipable_operator if pipable else operator  # type: ignore
+    if pipable is True:
+        return pipable_operator(func)  # type: ignore
+
+    # First check for classmethod instance, to avoid more confusing errors later on
+    if isinstance(func, classmethod):
+        raise ValueError(
+            "An operator cannot be created from a class method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Gather data
+    bases = (Stream,)
+    name = func.__name__
+    module = func.__module__
+    extra_doc = func.__doc__
+    doc = extra_doc or f"Regular {name} stream operator."
+
+    # Extract signature
+    signature = inspect.signature(func)
+    parameters = list(signature.parameters.values())
+    if parameters and parameters[0].name in ("self", "cls"):
+        raise ValueError(
+            "An operator cannot be created from a method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Look for "more_sources"
+    for i, p in enumerate(parameters):
+        if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+            more_sources_index = i
+            break
+    else:
+        more_sources_index = None
+
+    # Injected parameters
+    self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+    inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+    # Wrapped static method
+    original = func
+    original.__qualname__ = name + ".original"
+
+    # Raw static method
+    raw = func
+    raw.__qualname__ = name + ".raw"
+
+    # Init method
+    def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None:
+        if more_sources_index is not None:
+            for source in args[more_sources_index:]:
+                assert_async_iterable(source)
+        factory = functools.partial(raw, *args, **kwargs)
+        return BaseStream.__init__(self, factory)
+
+    # Customize init signature
+    new_parameters = [self_parameter] + parameters
+    init.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize init method
+    init.__qualname__ = name + ".__init__"
+    init.__name__ = "__init__"
+    init.__module__ = module
+    init.__doc__ = f"Initialize the {name} stream."
+
+    # Gather attributes
+    attrs = {
+        "__init__": init,
+        "__module__": module,
+        "__doc__": doc,
+        "raw": staticmethod(raw),
+        "original": staticmethod(original),
+    }
+
+    # Create operator class
+    return cast("OperatorType[P, T]", type(name, bases, attrs))
+
+
+def pipable_operator(
+    func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]],
+) -> PipableOperatorType[X, P, T]:
+    """Create a pipable stream operator from an asynchronous generator
+    (or any function returning an asynchronous iterable).
+
+    Decorator usage::
+
+        @pipable_operator
         async def multiply(source, factor):
             async with streamcontext(source) as streamer:
                  async for item in streamer:
                      yield factor * item
 
-    In the case of pipable operators, the first argument is expected
-    to be the asynchronous iteratable used for piping.
+    The first argument is expected to be the asynchronous iteratable used
+    for piping.
 
     The return value is a dynamically created class.
     It has the same name, module and doc as the original function.
@@ -242,8 +426,7 @@ def operator(func=None, *, pipable=False):
         ys = multiply(xs, 2)
 
     The original function is called at instanciation to check that
-    signature match. In the case of pipable operators, the source is
-    also checked for asynchronous iteration.
+    signature match. The source is also checked for asynchronous iteration.
 
     The operator also have a pipe class method that can be used along
     with the piping synthax::
@@ -260,123 +443,125 @@ def operator(func=None, *, pipable=False):
 
     The raw method is useful to create new operators from existing ones::
 
-        @operator(pipable=True)
+        @pipable_operator
         def double(source):
             return multiply.raw(source, 2)
     """
 
-    def decorator(func):
-        """Inner decorator for stream operator."""
-
-        # Gather data
-        bases = (Stream,)
-        name = func.__name__
-        module = func.__module__
-        extra_doc = func.__doc__
-        doc = extra_doc or f"Regular {name} stream operator."
-
-        # Extract signature
-        signature = inspect.signature(func)
-        parameters = list(signature.parameters.values())
-        if parameters and parameters[0].name in ("self", "cls"):
-            raise ValueError(
-                "An operator cannot be created from a method, "
-                "since the decorated function becomes an operator class"
-            )
-
-        # Look for "more_sources"
-        for i, p in enumerate(parameters):
-            if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
-                more_sources_index = i
-                break
-        else:
-            more_sources_index = None
-
-        # Injected parameters
-        self_parameter = inspect.Parameter(
-            "self", inspect.Parameter.POSITIONAL_OR_KEYWORD
+    # First check for classmethod instance, to avoid more confusing errors later on
+    if isinstance(func, classmethod):
+        raise ValueError(
+            "An operator cannot be created from a class method, "
+            "since the decorated function becomes an operator class"
         )
-        cls_parameter = inspect.Parameter(
-            "cls", inspect.Parameter.POSITIONAL_OR_KEYWORD
+
+    # Gather data
+    bases = (Stream,)
+    name = func.__name__
+    module = func.__module__
+    extra_doc = func.__doc__
+    doc = extra_doc or f"Regular {name} stream operator."
+
+    # Extract signature
+    signature = inspect.signature(func)
+    parameters = list(signature.parameters.values())
+    if parameters and parameters[0].name in ("self", "cls"):
+        raise ValueError(
+            "An operator cannot be created from a method, "
+            "since the decorated function becomes an operator class"
         )
 
-        # Wrapped static method
-        original = func
-        original.__qualname__ = name + ".original"
-
-        # Raw static method
-        raw = func
-        raw.__qualname__ = name + ".raw"
-
-        # Init method
-        def init(self, *args, **kwargs):
-            if pipable and args:
-                assert_async_iterable(args[0])
-            if more_sources_index is not None:
-                for source in args[more_sources_index:]:
-                    assert_async_iterable(source)
-            factory = functools.partial(self.raw, *args, **kwargs)
-            return Stream.__init__(self, factory)
-
-        # Customize init signature
-        new_parameters = [self_parameter] + parameters
-        init.__signature__ = signature.replace(parameters=new_parameters)
-
-        # Customize init method
-        init.__qualname__ = name + ".__init__"
-        init.__name__ = "__init__"
-        init.__module__ = module
-        init.__doc__ = f"Initialize the {name} stream."
-
-        if pipable:
-
-            # Raw static method
-            def raw(*args, **kwargs):
-                if args:
-                    assert_async_iterable(args[0])
-                if more_sources_index is not None:
-                    for source in args[more_sources_index:]:
-                        assert_async_iterable(source)
-                return func(*args, **kwargs)
-
-            # Custonize raw method
-            raw.__signature__ = signature
-            raw.__qualname__ = name + ".raw"
-            raw.__module__ = module
-            raw.__doc__ = doc
-
-            # Pipe class method
-            def pipe(cls, *args, **kwargs):
-                return lambda source: cls(source, *args, **kwargs)
-
-            # Customize pipe signature
-            if parameters and parameters[0].kind in (
-                inspect.Parameter.POSITIONAL_ONLY,
-                inspect.Parameter.POSITIONAL_OR_KEYWORD,
-            ):
-                new_parameters = [cls_parameter] + parameters[1:]
-            else:
-                new_parameters = [cls_parameter] + parameters
-            pipe.__signature__ = signature.replace(parameters=new_parameters)
-
-            # Customize pipe method
-            pipe.__qualname__ = name + ".pipe"
-            pipe.__module__ = module
-            pipe.__doc__ = f'Pipable "{name}" stream operator.'
-            if extra_doc:
-                pipe.__doc__ += "\n\n    " + extra_doc
-
-        # Gather attributes
-        attrs = {
-            "__init__": init,
-            "__module__": module,
-            "__doc__": doc,
-            "raw": staticmethod(raw),
-            "original": staticmethod(original),
-            "pipe": classmethod(pipe) if pipable else None,
-        }
-
-        # Create operator class
-        return type(name, bases, attrs)
-
-    return decorator if func is None else decorator(func)
+    # Look for "more_sources"
+    for i, p in enumerate(parameters):
+        if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+            more_sources_index = i
+            break
+    else:
+        more_sources_index = None
+
+    # Injected parameters
+    self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+    cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+    # Wrapped static method
+    original = func
+    original.__qualname__ = name + ".original"
+
+    # Raw static method
+    def raw(
+        arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+    ) -> AsyncIterator[T]:
+        assert_async_iterable(arg)
+        if more_sources_index is not None:
+            for source in args[more_sources_index - 1 :]:
+                assert_async_iterable(source)
+        return func(arg, *args, **kwargs)
+
+    # Custonize raw method
+    raw.__signature__ = signature  # type: ignore[attr-defined]
+    raw.__qualname__ = name + ".raw"
+    raw.__module__ = module
+    raw.__doc__ = doc
+
+    # Init method
+    def init(
+        self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+    ) -> None:
+        assert_async_iterable(arg)
+        if more_sources_index is not None:
+            for source in args[more_sources_index - 1 :]:
+                assert_async_iterable(source)
+        factory = functools.partial(raw, arg, *args, **kwargs)
+        return BaseStream.__init__(self, factory)
+
+    # Customize init signature
+    new_parameters = [self_parameter] + parameters
+    init.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize init method
+    init.__qualname__ = name + ".__init__"
+    init.__name__ = "__init__"
+    init.__module__ = module
+    init.__doc__ = f"Initialize the {name} stream."
+
+    # Pipe class method
+    def pipe(
+        cls: PipableOperatorType[X, P, T],
+        /,
+        *args: P.args,
+        **kwargs: P.kwargs,
+    ) -> Callable[[AsyncIterable[X]], Stream[T]]:
+        return lambda source: cls(source, *args, **kwargs)
+
+    # Customize pipe signature
+    if parameters and parameters[0].kind in (
+        inspect.Parameter.POSITIONAL_ONLY,
+        inspect.Parameter.POSITIONAL_OR_KEYWORD,
+    ):
+        new_parameters = [cls_parameter] + parameters[1:]
+    else:
+        new_parameters = [cls_parameter] + parameters
+    pipe.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize pipe method
+    pipe.__qualname__ = name + ".pipe"
+    pipe.__module__ = module
+    pipe.__doc__ = f'Pipable "{name}" stream operator.'
+    if extra_doc:
+        pipe.__doc__ += "\n\n    " + extra_doc
+
+    # Gather attributes
+    attrs = {
+        "__init__": init,
+        "__module__": module,
+        "__doc__": doc,
+        "raw": staticmethod(raw),
+        "original": staticmethod(original),
+        "pipe": classmethod(pipe),  # type: ignore[arg-type]
+    }
+
+    # Create operator class
+    return cast(
+        "PipableOperatorType[X, P, T]",
+        type(name, bases, attrs),
+    )
diff --git a/aiostream/manager.py b/aiostream/manager.py
index b355bbc6dab0f215a338d037aa012ab63a3c67c4..bab224a54732850e42cf320a193b6f56786de2c8 100644
--- a/aiostream/manager.py
+++ b/aiostream/manager.py
@@ -1,44 +1,69 @@
 """Provide a context to easily manage several streamers running
 concurrently.
 """
+from __future__ import annotations
 
 import asyncio
 from .aiter_utils import AsyncExitStack
 
 from .aiter_utils import anext
 from .core import streamcontext
+from typing import (
+    TYPE_CHECKING,
+    Awaitable,
+    List,
+    Set,
+    Tuple,
+    Generic,
+    TypeVar,
+    Any,
+    Type,
+    AsyncIterable,
+)
+from types import TracebackType
+
+if TYPE_CHECKING:
+    from asyncio import Task
+    from aiostream.core import Streamer
+
+T = TypeVar("T")
 
 
 class TaskGroup:
-    def __init__(self):
-        self._pending = set()
+    def __init__(self) -> None:
+        self._pending: set[Task[Any]] = set()
 
-    async def __aenter__(self):
+    async def __aenter__(self) -> TaskGroup:
         return self
 
-    async def __aexit__(self, *args):
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> None:
         while self._pending:
             task = self._pending.pop()
             await self.cancel_task(task)
 
-    def create_task(self, coro):
+    def create_task(self, coro: Awaitable[T]) -> Task[T]:
         task = asyncio.ensure_future(coro)
         self._pending.add(task)
         return task
 
-    async def wait_any(self, tasks):
+    async def wait_any(self, tasks: List[Task[T]]) -> Set[Task[T]]:
         done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED")
         self._pending -= done
         return done
 
-    async def wait_all(self, tasks):
+    async def wait_all(self, tasks: List[Task[T]]) -> Set[Task[T]]:
         if not tasks:
             return set()
         done, _ = await asyncio.wait(tasks)
         self._pending -= done
         return done
 
-    async def cancel_task(self, task):
+    async def cancel_task(self, task: Task[Any]) -> None:
         try:
             # The task is already cancelled
             if task.cancelled():
@@ -67,19 +92,24 @@ class TaskGroup:
             self._pending.discard(task)
 
 
-class StreamerManager:
-    def __init__(self):
-        self.tasks = {}
-        self.streamers = []
-        self.group = TaskGroup()
+class StreamerManager(Generic[T]):
+    def __init__(self) -> None:
+        self.tasks: dict[Streamer[T], Task[T]] = {}
+        self.streamers: list[Streamer[T]] = []
+        self.group: TaskGroup = TaskGroup()
         self.stack = AsyncExitStack()
 
-    async def __aenter__(self):
+    async def __aenter__(self) -> StreamerManager[T]:
         await self.stack.__aenter__()
         await self.stack.enter_async_context(self.group)
         return self
 
-    async def __aexit__(self, *args):
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> bool:
         for streamer in self.streamers:
             task = self.tasks.pop(streamer, None)
             if task is not None:
@@ -87,35 +117,38 @@ class StreamerManager:
             self.stack.push_async_exit(streamer)
         self.tasks.clear()
         self.streamers.clear()
-        return await self.stack.__aexit__(*args)
+        return await self.stack.__aexit__(typ, value, traceback)
 
-    async def enter_and_create_task(self, aiter):
+    async def enter_and_create_task(self, aiter: AsyncIterable[T]) -> Streamer[T]:
         streamer = streamcontext(aiter)
         await streamer.__aenter__()
         self.streamers.append(streamer)
         self.create_task(streamer)
         return streamer
 
-    def create_task(self, streamer):
+    def create_task(self, streamer: Streamer[T]) -> None:
         assert streamer in self.streamers
         assert streamer not in self.tasks
         self.tasks[streamer] = self.group.create_task(anext(streamer))
 
-    async def wait_single_event(self, filters):
+    async def wait_single_event(
+        self, filters: list[Streamer[T]]
+    ) -> Tuple[Streamer[T], Task[T]]:
         tasks = [self.tasks[streamer] for streamer in filters]
         done = await self.group.wait_any(tasks)
         for streamer in filters:
             if self.tasks.get(streamer) in done:
                 return streamer, self.tasks.pop(streamer)
+        assert False
 
-    async def clean_streamer(self, streamer):
+    async def clean_streamer(self, streamer: Streamer[T]) -> None:
         task = self.tasks.pop(streamer, None)
         if task is not None:
             await self.group.cancel_task(task)
         await streamer.aclose()
         self.streamers.remove(streamer)
 
-    async def clean_streamers(self, streamers):
+    async def clean_streamers(self, streamers: list[Streamer[T]]) -> None:
         tasks = [
             self.group.create_task(self.clean_streamer(streamer))
             for streamer in streamers
diff --git a/aiostream/pipe.py b/aiostream/pipe.py
index 7123d7d552e5c95bf45a287bb54c985d31cc874d..c27e101a1b5c584a66235b41e09c197a8e4c3ca0 100644
--- a/aiostream/pipe.py
+++ b/aiostream/pipe.py
@@ -1,20 +1,39 @@
 """Gather the pipe operators."""
+from __future__ import annotations
 
 from . import stream
 
-__all__ = []
-
-
-def update_pipe_module():
-    """Populate the pipe module dynamically."""
-    module_dir = __all__
-    operators = stream.__dict__
-    for key, value in operators.items():
-        if getattr(value, "pipe", None):
-            globals()[key] = value.pipe
-            if key not in module_dir:
-                module_dir.append(key)
-
-
-# Populate the module
-update_pipe_module()
+accumulate = stream.accumulate.pipe
+action = stream.action.pipe
+amap = stream.amap.pipe
+chain = stream.chain.pipe
+chunks = stream.chunks.pipe
+concat = stream.concat.pipe
+concatmap = stream.concatmap.pipe
+cycle = stream.cycle.pipe
+delay = stream.delay.pipe
+dropwhile = stream.dropwhile.pipe
+enumerate = stream.enumerate.pipe
+filter = stream.filter.pipe
+flatmap = stream.flatmap.pipe
+flatten = stream.flatten.pipe
+getitem = stream.getitem.pipe
+list = stream.list.pipe
+map = stream.map.pipe
+merge = stream.merge.pipe
+print = stream.print.pipe
+reduce = stream.reduce.pipe
+skip = stream.skip.pipe
+skiplast = stream.skiplast.pipe
+smap = stream.smap.pipe
+spaceout = stream.spaceout.pipe
+starmap = stream.starmap.pipe
+switch = stream.switch.pipe
+switchmap = stream.switchmap.pipe
+take = stream.take.pipe
+takelast = stream.takelast.pipe
+takewhile = stream.takewhile.pipe
+timeout = stream.timeout.pipe
+until = stream.until.pipe
+zip = stream.zip.pipe
+ziplatest = stream.ziplatest.pipe
diff --git a/examples/__init__.py b/aiostream/py.typed
similarity index 100%
rename from examples/__init__.py
rename to aiostream/py.typed
diff --git a/aiostream/stream/advanced.py b/aiostream/stream/advanced.py
index dd23cf5a02b7274fb560ea3c6bd762214ab63bd1..106f0f1d2e7a25a8fd5dba9b2682ae3fd81802cd 100644
--- a/aiostream/stream/advanced.py
+++ b/aiostream/stream/advanced.py
@@ -1,18 +1,33 @@
 """Advanced operators (to deal with streams of higher order) ."""
+from __future__ import annotations
+
+from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
+from typing_extensions import ParamSpec
 
 from . import combine
 
-from ..core import operator
+from ..core import Streamer, pipable_operator
 from ..manager import StreamerManager
 
+
 __all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"]
 
 
+T = TypeVar("T")
+U = TypeVar("U")
+P = ParamSpec("P")
+
+
 # Helper to manage stream of higher order
 
 
-@operator(pipable=True)
-async def base_combine(source, switch=False, ordered=False, task_limit=None):
+@pipable_operator
+async def base_combine(
+    source: AsyncIterable[AsyncIterable[T]],
+    switch: bool = False,
+    ordered: bool = False,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
     """Base operator for managing an asynchronous sequence of sequences.
 
     The sequences are awaited concurrently, although it's possible to limit
@@ -30,13 +45,13 @@ async def base_combine(source, switch=False, ordered=False, task_limit=None):
         raise ValueError("The task limit must be None or greater than 0")
 
     # Safe context
-    async with StreamerManager() as manager:
-
-        main_streamer = await manager.enter_and_create_task(source)
+    async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
+        main_streamer: Streamer[
+            AsyncIterable[T] | T
+        ] | None = await manager.enter_and_create_task(source)
 
         # Loop over events
         while manager.tasks:
-
             # Extract streamer groups
             substreamers = manager.streamers[1:]
             mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
@@ -60,7 +75,6 @@ async def base_combine(source, switch=False, ordered=False, task_limit=None):
 
             # End of stream
             except StopAsyncIteration:
-
                 # Main streamer is finished
                 if streamer is main_streamer:
                     main_streamer = None
@@ -75,13 +89,13 @@ async def base_combine(source, switch=False, ordered=False, task_limit=None):
 
             # Process result
             else:
-
                 # Switch mecanism
                 if switch and streamer is main_streamer:
                     await manager.clean_streamers(substreamers)
 
                 # Setup a new source
                 if streamer is main_streamer:
+                    assert isinstance(result, AsyncIterable)
                     await manager.enter_and_create_task(result)
 
                     # Re-schedule the main streamer if task limit allows it
@@ -90,7 +104,8 @@ async def base_combine(source, switch=False, ordered=False, task_limit=None):
 
                 # Yield the result
                 else:
-                    yield result
+                    item = cast("T", result)
+                    yield item
 
                     # Re-schedule the streamer
                     manager.create_task(streamer)
@@ -99,8 +114,10 @@ async def base_combine(source, switch=False, ordered=False, task_limit=None):
 # Advanced operators (for streams of higher order)
 
 
-@operator(pipable=True)
-def concat(source, task_limit=None):
+@pipable_operator
+def concat(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
     """Given an asynchronous sequence of sequences, generate the elements
     of the sequences in order.
 
@@ -112,8 +129,10 @@ def concat(source, task_limit=None):
     return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
 
 
-@operator(pipable=True)
-def flatten(source, task_limit=None):
+@pipable_operator
+def flatten(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
     """Given an asynchronous sequence of sequences, generate the elements
     of the sequences as soon as they're received.
 
@@ -125,8 +144,8 @@ def flatten(source, task_limit=None):
     return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
 
 
-@operator(pipable=True)
-def switch(source):
+@pipable_operator
+def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]:
     """Given an asynchronous sequence of sequences, generate the elements of
     the most recent sequence.
 
@@ -143,8 +162,13 @@ def switch(source):
 # Advanced *-map operators
 
 
-@operator(pipable=True)
-def concatmap(source, func, *more_sources, task_limit=None):
+@pipable_operator
+def concatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
     """Apply a given function that creates a sequence from the elements of one
     or several asynchronous sequences, and generate the elements of the created
     sequences in order.
@@ -154,13 +178,17 @@ def concatmap(source, func, *more_sources, task_limit=None):
     although it's possible to limit the amount of running sequences using
     the `task_limit` argument.
     """
-    return concat.raw(
-        combine.smap.raw(source, func, *more_sources), task_limit=task_limit
-    )
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return concat.raw(mapped, task_limit=task_limit)
 
 
-@operator(pipable=True)
-def flatmap(source, func, *more_sources, task_limit=None):
+@pipable_operator
+def flatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
     """Apply a given function that creates a sequence from the elements of one
     or several asynchronous sequences, and generate the elements of the created
     sequences as soon as they arrive.
@@ -172,13 +200,16 @@ def flatmap(source, func, *more_sources, task_limit=None):
 
     Errors raised in a source or output sequence are propagated.
     """
-    return flatten.raw(
-        combine.smap.raw(source, func, *more_sources), task_limit=task_limit
-    )
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return flatten.raw(mapped, task_limit=task_limit)
 
 
-@operator(pipable=True)
-def switchmap(source, func, *more_sources):
+@pipable_operator
+def switchmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
     """Apply a given function that creates a sequence from the elements of one
     or several asynchronous sequences and generate the elements of the most
     recently created sequence.
@@ -187,4 +218,5 @@ def switchmap(source, func, *more_sources):
     asynchronous sequence. Errors raised in a source or output sequence (that
     was not already closed) are propagated.
     """
-    return switch.raw(combine.smap.raw(source, func, *more_sources))
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return switch.raw(mapped)
diff --git a/aiostream/stream/aggregate.py b/aiostream/stream/aggregate.py
index fd577fc56219edb4940f88316a60f2de62315421..8ed7c0e15f0d3c6d1f9e34f81fe5a7760ffd87b9 100644
--- a/aiostream/stream/aggregate.py
+++ b/aiostream/stream/aggregate.py
@@ -1,17 +1,27 @@
 """Aggregation operators."""
+from __future__ import annotations
 
 import asyncio
+import builtins
 import operator as op
+from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
+
 
 from . import select
 from ..aiter_utils import anext
-from ..core import operator, streamcontext
+from ..core import pipable_operator, streamcontext
 
 __all__ = ["accumulate", "reduce", "list"]
 
+T = TypeVar("T")
+
 
-@operator(pipable=True)
-async def accumulate(source, func=op.add, initializer=None):
+@pipable_operator
+async def accumulate(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T] = op.add,
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
     """Generate a series of accumulated sums (or other binary function)
     from an asynchronous sequence.
 
@@ -33,14 +43,21 @@ async def accumulate(source, func=op.add, initializer=None):
         yield value
         # Iterate streamer
         async for item in streamer:
-            value = func(value, item)
+            returned = func(value, item)
             if iscorofunc:
-                value = await value
+                awaitable_value = cast("Awaitable[T]", returned)
+                value = await awaitable_value
+            else:
+                value = cast("T", returned)
             yield value
 
 
-@operator(pipable=True)
-def reduce(source, func, initializer=None):
+@pipable_operator
+def reduce(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T],
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
     """Apply a function of two arguments cumulatively to the items
     of an asynchronous sequence, reducing the sequence to a single value.
 
@@ -52,8 +69,8 @@ def reduce(source, func, initializer=None):
     return select.item.raw(acc, -1)
 
 
-@operator(pipable=True)
-async def list(source):
+@pipable_operator
+async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
     """Build a list from an asynchronous sequence.
 
     All the intermediate steps are generated, starting from the empty list.
@@ -65,7 +82,7 @@ async def list(source):
     ..note:: The same list object is produced at each step in order to avoid
     memory copies.
     """
-    result = []
+    result: builtins.list[T] = []
     yield result
     async with streamcontext(source) as streamer:
         async for item in streamer:
diff --git a/aiostream/stream/combine.py b/aiostream/stream/combine.py
index 1d5c3fc71e81c02fbce31f0d9c58c3bc4c53ef5b..a782a730fd30975eae536912c1c09e57361d68af 100644
--- a/aiostream/stream/combine.py
+++ b/aiostream/stream/combine.py
@@ -1,35 +1,57 @@
 """Combination operators."""
+from __future__ import annotations
 
 import asyncio
 import builtins
 
+from typing import (
+    Awaitable,
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Callable,
+    cast,
+)
+from typing_extensions import ParamSpec
+
 from ..aiter_utils import AsyncExitStack, anext
-from ..core import operator, streamcontext
+from ..core import streamcontext, pipable_operator
 
 from . import create
 from . import select
 from . import advanced
 from . import aggregate
 
-__all__ = ["chain", "zip", "map", "merge", "ziplatest"]
+__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]
+
+T = TypeVar("T")
+U = TypeVar("U")
+K = TypeVar("K")
+P = ParamSpec("P")
 
 
-@operator(pipable=True)
-async def chain(*sources):
+@pipable_operator
+async def chain(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
     """Chain asynchronous sequences together, in the order they are given.
 
     Note: the sequences are not iterated until it is required,
     so if the operation is interrupted, the remaining sequences
     will be left untouched.
     """
+    sources = source, *more_sources
     for source in sources:
         async with streamcontext(source) as streamer:
             async for item in streamer:
                 yield item
 
 
-@operator(pipable=True)
-async def zip(*sources):
+@pipable_operator
+async def zip(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[tuple[T, ...]]:
     """Combine and forward the elements of several asynchronous sequences.
 
     Each generated value is a tuple of elements, using the same order as
@@ -39,6 +61,17 @@ async def zip(*sources):
     Note: the different sequences are awaited in parrallel, so that their
     waiting times don't add up.
     """
+    sources = source, *more_sources
+
+    # One sources
+    if len(sources) == 1:
+        (source,) = sources
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield (item,)
+        return
+
+    # N sources
     async with AsyncExitStack() as stack:
         # Handle resources
         streamers = [
@@ -55,8 +88,31 @@ async def zip(*sources):
                 yield tuple(items)
 
 
-@operator(pipable=True)
-async def smap(source, func, *more_sources):
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class SmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class AmapCallable(Protocol[X, Y]):
+    async def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class MapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
+        ...
+
+
+@pipable_operator
+async def smap(
+    source: AsyncIterable[T],
+    func: SmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
     """Apply a given function to the elements of one or several
     asynchronous sequences.
 
@@ -67,15 +123,20 @@ async def smap(source, func, *more_sources):
     Note: if more than one sequence is provided, they're awaited concurrently
     so that their waiting times don't add up.
     """
-    if more_sources:
-        source = zip(source, *more_sources)
-    async with streamcontext(source) as streamer:
+    stream = zip(source, *more_sources)
+    async with streamcontext(stream) as streamer:
         async for item in streamer:
-            yield func(*item) if more_sources else func(item)
+            yield func(*item)
 
 
-@operator(pipable=True)
-def amap(source, corofn, *more_sources, ordered=True, task_limit=None):
+@pipable_operator
+def amap(
+    source: AsyncIterable[T],
+    corofn: AmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
     """Apply a given coroutine function to the elements of one or several
     asynchronous sequences.
 
@@ -94,8 +155,8 @@ def amap(source, corofn, *more_sources, ordered=True, task_limit=None):
     so that their waiting times don't add up.
     """
 
-    def func(*args):
-        return create.call(corofn, *args)
+    async def func(arg: T, *args: T) -> AsyncIterable[U]:
+        yield await corofn(arg, *args)
 
     if ordered:
         return advanced.concatmap.raw(
@@ -104,8 +165,14 @@ def amap(source, corofn, *more_sources, ordered=True, task_limit=None):
     return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
 
 
-@operator(pipable=True)
-def map(source, func, *more_sources, ordered=True, task_limit=None):
+@pipable_operator
+def map(
+    source: AsyncIterable[T],
+    func: MapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
     """Apply a given function to the elements of one or several
     asynchronous sequences.
 
@@ -140,22 +207,32 @@ def map(source, func, *more_sources, ordered=True, task_limit=None):
         return amap.raw(
             source, func, *more_sources, ordered=ordered, task_limit=task_limit
         )
-    return smap.raw(source, func, *more_sources)
+    sync_func = cast("SmapCallable[T, U]", func)
+    return smap.raw(source, sync_func, *more_sources)
 
 
-@operator(pipable=True)
-def merge(*sources):
+@pipable_operator
+def merge(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
     """Merge several asynchronous sequences together.
 
     All the sequences are iterated simultaneously and their elements
     are forwarded as soon as they're available. The generation continues
     until all the sequences are exhausted.
     """
-    return advanced.flatten.raw(create.iterate.raw(sources))
-
-
-@operator(pipable=True)
-def ziplatest(*sources, partial=True, default=None):
+    sources = [source, *more_sources]
+    source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources)
+    return advanced.flatten.raw(source_stream)
+
+
+@pipable_operator
+def ziplatest(
+    source: AsyncIterable[T],
+    *more_sources: AsyncIterable[T],
+    partial: bool = True,
+    default: T | None = None,
+) -> AsyncIterator[tuple[T | None, ...]]:
     """Combine several asynchronous sequences together, producing a tuple with
     the lastest element of each sequence whenever a new element is received.
 
@@ -169,16 +246,21 @@ def ziplatest(*sources, partial=True, default=None):
     are forwarded as soon as they're available. The generation continues
     until all the sequences are exhausted.
     """
+    sources = source, *more_sources
     n = len(sources)
 
     # Custom getter
-    def getter(dct):
+    def getter(dct: dict[int, T]) -> Callable[[int], T | None]:
         return lambda key: dct.get(key, default)
 
     # Add source index to the items
-    new_sources = [
-        smap.raw(source, lambda x, i=i: {i: x}) for i, source in enumerate(sources)
-    ]
+    def make_func(i: int) -> SmapCallable[T, dict[int, T]]:
+        def func(x: T, *_: object) -> dict[int, T]:
+            return {i: x}
+
+        return func
+
+    new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
 
     # Merge the sources
     merged = merge.raw(*new_sources)
@@ -194,4 +276,7 @@ def ziplatest(*sources, partial=True, default=None):
     )
 
     # Convert the state dict to a tuple
-    return smap.raw(filtered, lambda x: tuple(builtins.map(getter(x), range(n))))
+    def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]:
+        return tuple(builtins.map(getter(x), range(n)))
+
+    return smap.raw(filtered, dict_to_tuple)
diff --git a/aiostream/stream/create.py b/aiostream/stream/create.py
index 8ee7b851c299ca3da376ba4c96f1af3e694a618b..d7676b71a65450f78a54bbc1a96d78f346e73379 100644
--- a/aiostream/stream/create.py
+++ b/aiostream/stream/create.py
@@ -1,15 +1,25 @@
 """Non-pipable creation operators."""
+from __future__ import annotations
 
+import sys
 import asyncio
 import inspect
 import builtins
 import itertools
 
-from collections.abc import Iterable
+from typing import (
+    AsyncIterable,
+    Awaitable,
+    Iterable,
+    Protocol,
+    TypeVar,
+    AsyncIterator,
+    cast,
+)
+from typing_extensions import ParamSpec
 
 from ..stream import time
 from ..core import operator, streamcontext
-from ..aiter_utils import is_async_iterable
 
 __all__ = [
     "iterate",
@@ -24,12 +34,18 @@ __all__ = [
     "count",
 ]
 
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+    P = TypeVar("P")
 
 # Convert regular iterables
 
 
 @operator
-async def from_iterable(it):
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
     """Generate values from a regular iterable."""
     for item in it:
         await asyncio.sleep(0)
@@ -37,7 +53,7 @@ async def from_iterable(it):
 
 
 @operator
-def from_async_iterable(ait):
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
     """Generate values from an asynchronous iterable.
 
     Note: the corresponding iterator will be explicitely closed
@@ -46,9 +62,9 @@ def from_async_iterable(ait):
 
 
 @operator
-def iterate(it):
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
     """Generate values from a sychronous or asynchronous iterable."""
-    if is_async_iterable(it):
+    if isinstance(it, AsyncIterable):
         return from_async_iterable.raw(it)
     if isinstance(it, Iterable):
         return from_iterable.raw(it)
@@ -56,7 +72,7 @@ def iterate(it):
 
 
 @operator
-async def preserve(ait):
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
     """Generate values from an asynchronous iterable without
     explicitly closing the corresponding iterator."""
     async for item in ait:
@@ -67,7 +83,7 @@ async def preserve(ait):
 
 
 @operator
-async def just(value):
+async def just(value: T) -> AsyncIterator[T]:
     """Await if possible, and generate a single value."""
     if inspect.isawaitable(value):
         yield await value
@@ -75,20 +91,37 @@ async def just(value):
         yield value
 
 
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+        ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+        ...
+
+
 @operator
-async def call(func, *args, **kwargs):
+async def call(
+    func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
     """Call the given function and generate a single value.
 
     Await if the provided function is asynchronous.
     """
     if asyncio.iscoroutinefunction(func):
-        yield await func(*args, **kwargs)
+        async_func = cast("AsyncCallable[P, T]", func)
+        yield await async_func(*args, **kwargs)
     else:
-        yield func(*args, **kwargs)
+        sync_func = cast("SyncCallable[P, T]", func)
+        yield sync_func(*args, **kwargs)
 
 
 @operator
-async def throw(exc):
+async def throw(exc: Exception) -> AsyncIterator[None]:
     """Throw an exception without generating any value."""
     if False:
         yield
@@ -96,18 +129,18 @@ async def throw(exc):
 
 
 @operator
-async def empty():
+async def empty() -> AsyncIterator[None]:
     """Terminate without generating any value."""
     if False:
         yield
 
 
 @operator
-async def never():
+async def never() -> AsyncIterator[None]:
     """Hang forever without generating any value."""
     if False:
         yield
-    future = asyncio.Future()
+    future: asyncio.Future[None] = asyncio.Future()
     try:
         await future
     finally:
@@ -115,7 +148,9 @@ async def never():
 
 
 @operator
-def repeat(value, times=None, *, interval=0):
+def repeat(
+    value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
     """Generate the same value a given number of times.
 
     If ``times`` is ``None``, the value is repeated indefinitely.
@@ -131,7 +166,7 @@ def repeat(value, times=None, *, interval=0):
 
 
 @operator
-def range(*args, interval=0):
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
     """Generate a given range of numbers.
 
     It supports the same arguments as the builtin function.
@@ -142,7 +177,9 @@ def range(*args, interval=0):
 
 
 @operator
-def count(start=0, step=1, *, interval=0):
+def count(
+    start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
     """Generate consecutive numbers indefinitely.
 
     Optional starting point and increment can be defined,
diff --git a/aiostream/stream/misc.py b/aiostream/stream/misc.py
index 5120a59e297735f68be20fee318d5de8106eaefa..1be834e6e0c7c6857ad9404728a2b258c44b3b10 100644
--- a/aiostream/stream/misc.py
+++ b/aiostream/stream/misc.py
@@ -1,46 +1,83 @@
 """Extra operators."""
+from __future__ import annotations
+
 import asyncio
 import builtins
 
-from .transform import map
-from ..core import operator
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
 
 __all__ = ["action", "print"]
 
 
-@operator(pipable=True)
-def action(source, func):
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+    source: AsyncIterable[T],
+    func: Callable[[T], Awaitable[Any] | Any],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
     """Perform an action for each element of an asynchronous sequence
     without modifying it.
 
     The given function can be synchronous or asynchronous.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if the
+    provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
     """
     if asyncio.iscoroutinefunction(func):
 
-        async def innerfunc(arg):
-            await func(arg)
+        async def ainnerfunc(arg: T, *_: object) -> T:
+            awaitable = func(arg)
+            assert isinstance(awaitable, Awaitable)
+            await awaitable
             return arg
 
+        return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
     else:
 
-        def innerfunc(arg):
+        def innerfunc(arg: T, *_: object) -> T:
             func(arg)
             return arg
 
-    return map.raw(source, innerfunc)
+        return smap.raw(source, innerfunc)
 
 
-@operator(pipable=True)
-def print(source, template=None, **kwargs):
+@pipable_operator
+def print(
+    source: AsyncIterable[T],
+    template: str = "{}",
+    sep: str = " ",
+    end: str = "\n",
+    file: Any | None = None,
+    flush: bool = False,
+) -> AsyncIterator[T]:
     """Print each element of an asynchronous sequence without modifying it.
 
     An optional template can be provided to be formatted with the elements.
     All the keyword arguments are forwarded to the builtin function print.
     """
 
-    def func(value):
-        if template:
-            value = template.format(value)
-        builtins.print(value, **kwargs)
+    def func(value: T) -> None:
+        string = template.format(value)
+        builtins.print(
+            string,
+            sep=sep,
+            end=end,
+            file=file,
+            flush=flush,
+        )
 
     return action.raw(source, func)
diff --git a/aiostream/stream/select.py b/aiostream/stream/select.py
index a7c65da0595d8ed3c564b117b22c9c4a4ca0cee2..9390f4642a47977b8d8b8499dcf8cf2e0f705bd9 100644
--- a/aiostream/stream/select.py
+++ b/aiostream/stream/select.py
@@ -1,12 +1,15 @@
 """Selection operators."""
+from __future__ import annotations
 
 import asyncio
 import builtins
 import collections
 
+from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator
+
 from . import transform
-from ..aiter_utils import anext
-from ..core import operator, streamcontext
+from ..aiter_utils import aiter, anext
+from ..core import streamcontext, pipable_operator
 
 __all__ = [
     "take",
@@ -20,15 +23,17 @@ __all__ = [
     "takewhile",
 ]
 
+T = TypeVar("T")
+
 
-@operator(pipable=True)
-async def take(source, n):
+@pipable_operator
+async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
     """Forward the first ``n`` elements from an asynchronous sequence.
 
     If ``n`` is negative, it simply terminates before iterating the source.
     """
-    source = transform.enumerate.raw(source)
-    async with streamcontext(source) as streamer:
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
         if n <= 0:
             return
         async for i, item in streamer:
@@ -37,8 +42,8 @@ async def take(source, n):
                 return
 
 
-@operator(pipable=True)
-async def takelast(source, n):
+@pipable_operator
+async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
     """Forward the last ``n`` elements from an asynchronous sequence.
 
     If ``n`` is negative, it simply terminates after iterating the source.
@@ -46,7 +51,7 @@ async def takelast(source, n):
     Note: it is required to reach the end of the source before the first
     element is generated.
     """
-    queue = collections.deque(maxlen=n if n > 0 else 0)
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
     async with streamcontext(source) as streamer:
         async for item in streamer:
             queue.append(item)
@@ -54,21 +59,21 @@ async def takelast(source, n):
             yield item
 
 
-@operator(pipable=True)
-async def skip(source, n):
+@pipable_operator
+async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
     """Forward an asynchronous sequence, skipping the first ``n`` elements.
 
     If ``n`` is negative, no elements are skipped.
     """
-    source = transform.enumerate.raw(source)
-    async with streamcontext(source) as streamer:
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
         async for i, item in streamer:
             if i >= n:
                 yield item
 
 
-@operator(pipable=True)
-async def skiplast(source, n):
+@pipable_operator
+async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
     """Forward an asynchronous sequence, skipping the last ``n`` elements.
 
     If ``n`` is negative, no elements are skipped.
@@ -76,7 +81,7 @@ async def skiplast(source, n):
     Note: it is required to reach the ``n+1`` th element of the source
     before the first element is generated.
     """
-    queue = collections.deque(maxlen=n if n > 0 else 0)
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
     async with streamcontext(source) as streamer:
         async for item in streamer:
             if n <= 0:
@@ -87,23 +92,25 @@ async def skiplast(source, n):
             queue.append(item)
 
 
-@operator(pipable=True)
-async def filterindex(source, func):
+@pipable_operator
+async def filterindex(
+    source: AsyncIterable[T], func: Callable[[int], bool]
+) -> AsyncIterator[T]:
     """Filter an asynchronous sequence using the index of the elements.
 
     The given function is synchronous, takes the index as an argument,
     and returns ``True`` if the corresponding should be forwarded,
     ``False`` otherwise.
     """
-    source = transform.enumerate.raw(source)
-    async with streamcontext(source) as streamer:
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
         async for i, item in streamer:
             if func(i):
                 yield item
 
 
-@operator(pipable=True)
-def slice(source, *args):
+@pipable_operator
+def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]:
     """Slice an asynchronous sequence.
 
     The arguments are the same as the builtin type slice.
@@ -114,31 +121,32 @@ def slice(source, *args):
     """
     s = builtins.slice(*args)
     start, stop, step = s.start or 0, s.stop, s.step or 1
+    aiterator = aiter(source)
     # Filter the first items
     if start < 0:
-        source = takelast.raw(source, abs(start))
+        aiterator = takelast.raw(aiterator, abs(start))
     elif start > 0:
-        source = skip.raw(source, start)
+        aiterator = skip.raw(aiterator, start)
     # Filter the last items
     if stop is not None:
         if stop >= 0 and start < 0:
             raise ValueError("Positive stop with negative start is not supported")
         elif stop >= 0:
-            source = take.raw(source, stop - start)
+            aiterator = take.raw(aiterator, stop - start)
         else:
-            source = skiplast.raw(source, abs(stop))
+            aiterator = skiplast.raw(aiterator, abs(stop))
     # Filter step items
     if step is not None:
         if step > 1:
-            source = filterindex.raw(source, lambda i: i % step == 0)
+            aiterator = filterindex.raw(aiterator, lambda i: i % step == 0)
         elif step < 0:
             raise ValueError("Negative step not supported")
     # Return
-    return source
+    return aiterator
 
 
-@operator(pipable=True)
-async def item(source, index):
+@pipable_operator
+async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]:
     """Forward the ``n``th element of an asynchronous sequence.
 
     The index can be negative and works like regular indexing.
@@ -166,8 +174,8 @@ async def item(source, index):
         yield result
 
 
-@operator(pipable=True)
-def getitem(source, index):
+@pipable_operator
+def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]:
     """Forward one or several items from an asynchronous sequence.
 
     The argument can either be a slice or an integer.
@@ -180,8 +188,10 @@ def getitem(source, index):
     raise TypeError("Not a valid index (int or slice)")
 
 
-@operator(pipable=True)
-async def filter(source, func):
+@pipable_operator
+async def filter(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
     """Filter an asynchronous sequence using an arbitrary function.
 
     The function takes the item as an argument and returns ``True``
@@ -193,13 +203,16 @@ async def filter(source, func):
         async for item in streamer:
             result = func(item)
             if iscorofunc:
+                assert isinstance(result, Awaitable)
                 result = await result
             if result:
                 yield item
 
 
-@operator(pipable=True)
-async def until(source, func):
+@pipable_operator
+async def until(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
     """Forward an asynchronous sequence until a condition is met.
 
     Contrary to the ``takewhile`` operator, the last tested element is included
@@ -214,14 +227,17 @@ async def until(source, func):
         async for item in streamer:
             result = func(item)
             if iscorofunc:
+                assert isinstance(result, Awaitable)
                 result = await result
             yield item
             if result:
                 return
 
 
-@operator(pipable=True)
-async def takewhile(source, func):
+@pipable_operator
+async def takewhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
     """Forward an asynchronous sequence while a condition is met.
 
     Contrary to the ``until`` operator, the last tested element is not included
@@ -236,14 +252,17 @@ async def takewhile(source, func):
         async for item in streamer:
             result = func(item)
             if iscorofunc:
+                assert isinstance(result, Awaitable)
                 result = await result
             if not result:
                 return
             yield item
 
 
-@operator(pipable=True)
-async def dropwhile(source, func):
+@pipable_operator
+async def dropwhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
     """Discard the elements from an asynchronous sequence
     while a condition is met.
 
@@ -256,6 +275,7 @@ async def dropwhile(source, func):
         async for item in streamer:
             result = func(item)
             if iscorofunc:
+                assert isinstance(result, Awaitable)
                 result = await result
             if not result:
                 yield item
diff --git a/aiostream/stream/time.py b/aiostream/stream/time.py
index 72c2f27cc64c6897814e8ac1a598bad2eb2ec4c3..c9c0df88a309f2e604a46ccd6bd690a14f0c41fd 100644
--- a/aiostream/stream/time.py
+++ b/aiostream/stream/time.py
@@ -1,45 +1,36 @@
 """Time-specific operators."""
-
+from __future__ import annotations
 import asyncio
 
 from ..aiter_utils import anext
-from ..core import operator, streamcontext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
 
 __all__ = ["spaceout", "delay", "timeout"]
 
 
-async def wait_for(aw, timeout):
-    task = asyncio.ensure_future(aw)
-    try:
-        return await asyncio.wait_for(task, timeout)
-    finally:
-        # Python 3.6 compatibility
-        if not task.done():  # pragma: no cover
-            task.cancel()
-            try:
-                await task
-            except asyncio.CancelledError:
-                pass
+T = TypeVar("T")
 
 
-@operator(pipable=True)
-async def spaceout(source, interval):
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
     """Make sure the elements of an asynchronous sequence are separated
     in time by the given interval.
     """
-    timeout = 0
+    timeout = 0.0
     loop = asyncio.get_event_loop()
     async with streamcontext(source) as streamer:
         async for item in streamer:
             delta = timeout - loop.time()
-            delay = delta if delta > 0 else 0
+            delay = delta if delta > 0 else 0.0
             await asyncio.sleep(delay)
             yield item
             timeout = loop.time() + interval
 
 
-@operator(pipable=True)
-async def timeout(source, timeout):
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
     """Raise a time-out if an element of the asynchronous sequence
     takes too long to arrive.
 
@@ -49,15 +40,15 @@ async def timeout(source, timeout):
     async with streamcontext(source) as streamer:
         while True:
             try:
-                item = await wait_for(anext(streamer), timeout)
+                item = await asyncio.wait_for(anext(streamer), timeout)
             except StopAsyncIteration:
                 break
             else:
                 yield item
 
 
-@operator(pipable=True)
-async def delay(source, delay):
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
     """Delay the iteration of an asynchronous sequence."""
     await asyncio.sleep(delay)
     async with streamcontext(source) as streamer:
diff --git a/aiostream/stream/transform.py b/aiostream/stream/transform.py
index 961157fa521be4ce4338900523ab160b186d0653..f11bffa675c6b62c3f6ae159c106fcb098ad76c9 100644
--- a/aiostream/stream/transform.py
+++ b/aiostream/stream/transform.py
@@ -1,9 +1,19 @@
 """Transformation operators."""
 
+from __future__ import annotations
+
 import asyncio
 import itertools
+from typing import (
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Awaitable,
+    cast,
+)
 
-from ..core import operator, streamcontext
+from ..core import streamcontext, pipable_operator
 
 from . import select
 from . import create
@@ -15,9 +25,14 @@ __all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
 # map, amap and smap are also transform operators
 map, amap, smap
 
+T = TypeVar("T")
+U = TypeVar("U")
+
 
-@operator(pipable=True)
-async def enumerate(source, start=0, step=1):
+@pipable_operator
+async def enumerate(
+    source: AsyncIterable[T], start: int = 0, step: int = 1
+) -> AsyncIterator[tuple[int, T]]:
     """Generate ``(index, value)`` tuples from an asynchronous sequence.
 
     This index is computed using a starting point and an increment,
@@ -29,8 +44,27 @@ async def enumerate(source, start=0, step=1):
             yield next(count), item
 
 
-@operator(pipable=True)
-def starmap(source, func, ordered=True, task_limit=None):
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class AsyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
+        ...
+
+
+class SyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+@pipable_operator
+def starmap(
+    source: AsyncIterable[tuple[T, ...]],
+    func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
     """Apply a given function to the unpacked elements of
     an asynchronous sequence.
 
@@ -47,20 +81,25 @@ def starmap(source, func, ordered=True, task_limit=None):
     is synchronous.
     """
     if asyncio.iscoroutinefunction(func):
+        async_func = cast("AsyncStarmapCallable[T, U]", func)
+
+        async def astarfunc(args: tuple[T, ...], *_: object) -> U:
+            awaitable = async_func(*args)
+            return await awaitable
 
-        async def starfunc(args):
-            return await func(*args)
+        return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
 
     else:
+        sync_func = cast("SyncStarmapCallable[T, U]", func)
 
-        def starfunc(args):
-            return func(*args)
+        def starfunc(args: tuple[T, ...], *_: object) -> U:
+            return sync_func(*args)
 
-    return map.raw(source, starfunc, ordered=ordered, task_limit=task_limit)
+        return smap.raw(source, starfunc)
 
 
-@operator(pipable=True)
-async def cycle(source):
+@pipable_operator
+async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
     """Iterate indefinitely over an asynchronous sequence.
 
     Note: it does not perform any buffering, but re-iterate over
@@ -76,8 +115,8 @@ async def cycle(source):
             await asyncio.sleep(0)
 
 
-@operator(pipable=True)
-async def chunks(source, n):
+@pipable_operator
+async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
     """Generate chunks of size ``n`` from an asynchronous sequence.
 
     The chunks are lists, and the last chunk might contain less than ``n``
diff --git a/aiostream/test_utils.py b/aiostream/test_utils.py
index c76b230a0a0c798f56c12cdacb2d9f40d18723f4..56761d2e603ebbf490fbb61c3b3793385d0c496e 100644
--- a/aiostream/test_utils.py
+++ b/aiostream/test_utils.py
@@ -1,4 +1,5 @@
 """Utilities for testing stream operators."""
+from __future__ import annotations
 
 import asyncio
 from unittest.mock import Mock
@@ -6,12 +7,17 @@ from contextlib import contextmanager
 
 import pytest
 
-from .core import StreamEmpty, operator, streamcontext
+from .core import StreamEmpty, streamcontext, pipable_operator
+from typing import TYPE_CHECKING, Any, Callable, List
+
+if TYPE_CHECKING:
+    from _pytest.fixtures import SubRequest
+    from aiostream.core import Stream
 
 __all__ = ["add_resource", "assert_run", "event_loop"]
 
 
-@operator(pipable=True)
+@pipable_operator
 async def add_resource(source, cleanup_time):
     """Simulate an open resource in a stream operator."""
     try:
@@ -28,35 +34,48 @@ async def add_resource(source, cleanup_time):
             loop.open_resources -= 1
 
 
-def compare_exceptions(exc1, exc2):
+def compare_exceptions(
+    exc1: Exception,
+    exc2: Exception,
+) -> bool:
     """Compare two exceptions together."""
     return exc1 == exc2 or exc1.__class__ == exc2.__class__ and exc1.args == exc2.args
 
 
-async def assert_aiter(source, values, exception=None):
+async def assert_aiter(
+    source: Stream,
+    values: List[Any],
+    exception: Exception | None = None,
+) -> None:
     """Check the results of a stream using a streamcontext."""
     results = []
-    exception_type = type(exception) if exception else ()
+    exception_type = (type(exception),) if exception else ()
     try:
         async with streamcontext(source) as streamer:
             async for item in streamer:
                 results.append(item)
     except exception_type as exc:
+        assert exception is not None
         assert compare_exceptions(exc, exception)
     else:
         assert exception is None
     assert results == values
 
 
-async def assert_await(source, values, exception=None):
+async def assert_await(
+    source: Stream,
+    values: List[Any],
+    exception: Exception | None = None,
+) -> None:
     """Check the results of a stream using by awaiting it."""
-    exception_type = type(exception) if exception else ()
+    exception_type = (type(exception),) if exception else ()
     try:
         result = await source
     except StreamEmpty:
         assert values == []
         assert exception is None
     except exception_type as exc:
+        assert exception is not None
         assert compare_exceptions(exc, exception)
     else:
         assert result == values[-1]
@@ -64,7 +83,7 @@ async def assert_await(source, values, exception=None):
 
 
 @pytest.fixture(params=[assert_aiter, assert_await], ids=["aiter", "await"])
-def assert_run(request):
+def assert_run(request: SubRequest) -> Callable:
     """Parametrized fixture returning a stream runner."""
     return request.param
 
@@ -81,7 +100,6 @@ def event_loop():
     """
 
     class TimeTrackingTestLoop(asyncio.BaseEventLoop):
-
         stuck_threshold = 100
 
         def __init__(self):
diff --git a/docs/conf.py b/docs/conf.py
index 421889f0858752c7e91ecb096a4c09562fa6273c..1dd09c575e0f359a10e803eb5635dc16f0fe4b6e 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -5,7 +5,7 @@ VERSION = open("../setup.py").read().split('version="')[1].split('"')[0]
 project = "aiostream"
 version = VERSION
 author = "Vincent Michel"
-copyright = u"2016, Vincent Michel"
+copyright = "2016, Vincent Michel"
 
 master_doc = "index"
 highlight_language = "python"
diff --git a/docs/core.rst b/docs/core.rst
index dd4ef9af7bf62b9a87d2c54c91e9094d2b5d1f92..0aedf43edff1f7300e529566284c5401eb7dd052 100644
--- a/docs/core.rst
+++ b/docs/core.rst
@@ -16,7 +16,9 @@ Stream context manager
 .. autofunction:: streamcontext
 
 
-Operator decorator
-------------------
+Operator decorators
+-------------------
 
 .. autofunction:: operator
+
+.. autofunction:: pipable_operator
diff --git a/docs/requirements.txt b/docs/requirements.txt
deleted file mode 100644
index 0607b1efa78c2aabad20d54c4d61e26f999ee61c..0000000000000000000000000000000000000000
--- a/docs/requirements.txt
+++ /dev/null
@@ -1 +0,0 @@
-sphinx_rtd_theme>=0.3.1
diff --git a/examples/demo.py b/examples/demo.py
index 9d9e7fba082b08c316d1a9de72d9fc07ae49d619..e1352c6b064e4f9829fe1d66711aafdf72a2129a 100644
--- a/examples/demo.py
+++ b/examples/demo.py
@@ -1,24 +1,26 @@
 import asyncio
-from aiostream import stream, pipe
 
+from aiostream import pipe, stream
 
-async def main():
 
+def square(x: int, *_: int) -> int:
+    return x**2
+
+
+async def main() -> None:
     # Create a counting stream with a 0.2 seconds interval
     xs = stream.count(interval=0.2)
 
     # Operators can be piped using '|'
-    ys = xs | pipe.map(lambda x: x ** 2)
+    ys = xs | pipe.map(square)
 
     # Streams can be sliced
     zs = ys[1:10:2]
 
     # Use a stream context for proper resource management
     async with zs.stream() as streamer:
-
         # Asynchronous iteration
         async for z in streamer:
-
             # Print 1, 9, 25, 49 and 81
             print("->", z)
 
@@ -36,6 +38,4 @@ async def main():
 
 
 # Run main coroutine
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
-loop.close()
+asyncio.run(main())
diff --git a/examples/extra.py b/examples/extra.py
index 606a23ac1fd0c1dd52049d403d2fba31c7baf454..5a2084e5db7d85e6b8dcd16db31ae215552b1489 100644
--- a/examples/extra.py
+++ b/examples/extra.py
@@ -1,32 +1,37 @@
 import asyncio
 import random as random_module
+from typing import AsyncIterable, AsyncIterator
 
-from aiostream import operator, pipe, streamcontext
+from aiostream import operator, pipable_operator, pipe, streamcontext
 
 
 @operator
-async def random(offset=0.0, width=1.0, interval=0.1):
+async def random(
+    offset: float = 0.0, width: float = 1.0, interval: float = 0.1
+) -> AsyncIterator[float]:
     """Generate a stream of random numbers."""
     while True:
         await asyncio.sleep(interval)
         yield offset + width * random_module.random()
 
 
-@operator(pipable=True)
-async def power(source, exponent):
+@pipable_operator
+async def power(
+    source: AsyncIterable[float], exponent: float | int
+) -> AsyncIterator[float]:
     """Raise the elements of an asynchronous sequence to the given power."""
     async with streamcontext(source) as streamer:
         async for item in streamer:
-            yield item ** exponent
+            yield item**exponent
 
 
-@operator(pipable=True)
-def square(source):
+@pipable_operator
+def square(source: AsyncIterable[float]) -> AsyncIterator[float]:
     """Square the elements of an asynchronous sequence."""
     return power.raw(source, 2)
 
 
-async def main():
+async def main() -> None:
     xs = (
         random()  # Stream random numbers
         | square.pipe()  # Square the values
@@ -37,6 +42,4 @@ async def main():
 
 
 # Run main coroutine
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
-loop.close()
+asyncio.run(main())
diff --git a/examples/norm_server.py b/examples/norm_server.py
index a3ccd86811e85c4853ce42e100683de6b88282e6..a1283f29c7ef68460ddd49328363c59ec1b9c1cb 100644
--- a/examples/norm_server.py
+++ b/examples/norm_server.py
@@ -14,8 +14,10 @@ Test using netcat client:
     [...]
 """
 
+import math
 import asyncio
-from aiostream import stream, pipe
+
+from aiostream import pipe, stream
 
 # Constants
 
@@ -39,14 +41,24 @@ RESULT = """\
 # Client handler
 
 
-async def euclidean_norm_handler(reader, writer):
-
+async def euclidean_norm_handler(
+    reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+) -> None:
     # Define lambdas
-    strip = lambda x: x.decode().strip()
-    nonempty = lambda x: x != ""
-    square = lambda x: x ** 2
-    write_cursor = lambda x: writer.write(b"> ")
-    square_root = lambda x: x ** 0.5
+    def strip(x: bytes, *_: object) -> str:
+        return x.decode().strip()
+
+    def nonempty(x: str) -> bool:
+        return x != ""
+
+    def square(x: float, *_: object) -> float:
+        return x**2
+
+    def write_cursor(_: float) -> None:
+        return writer.write(b"> ")
+
+    def square_root(x: float, *_: object) -> float:
+        return math.sqrt(x)
 
     # Create awaitable
     handle_request = (
@@ -58,7 +70,7 @@ async def euclidean_norm_handler(reader, writer):
         | pipe.map(square)
         | pipe.print("square: {:.2f}")
         | pipe.action(write_cursor)
-        | pipe.accumulate(initializer=0)
+        | pipe.accumulate(initializer=0.0)
         | pipe.map(square_root)
         | pipe.print("norm -> {:.2f}")
     )
@@ -77,27 +89,18 @@ async def euclidean_norm_handler(reader, writer):
 # Main function
 
 
-def run_server(bind="127.0.0.1", port=8888):
-
+async def main(bind: str = "127.0.0.1", port: int = 8888) -> None:
     # Start the server
-    loop = asyncio.get_event_loop()
-    coro = asyncio.start_server(euclidean_norm_handler, bind, port)
-    server = loop.run_until_complete(coro)
+    server = await asyncio.start_server(euclidean_norm_handler, bind, port)
 
     # Serve requests until Ctrl+C is pressed
     print("Serving on {}".format(server.sockets[0].getsockname()))
-    try:
-        loop.run_forever()
-    except KeyboardInterrupt:
-        pass
 
-    # Close the server
-    server.close()
-    loop.run_until_complete(server.wait_closed())
-    loop.close()
+    async with server:
+        await server.serve_forever()
 
 
 # Main execution
 
 if __name__ == "__main__":
-    run_server()
+    asyncio.run(main())
diff --git a/examples/preserve.py b/examples/preserve.py
index 50057d9056ff59996daa4a1c39d811df74d8f8e0..9fedb0b2a35c81022be182bcd1fa0678d6c62e27 100644
--- a/examples/preserve.py
+++ b/examples/preserve.py
@@ -1,9 +1,11 @@
 import asyncio
-from aiostream import stream, operator
+from typing import AsyncIterator
 
+from aiostream import operator, stream
 
-async def main():
-    async def agen():
+
+async def main() -> None:
+    async def agen() -> AsyncIterator[int]:
         yield 1
         yield 2
         yield 3
@@ -26,6 +28,4 @@ async def main():
 
 
 # Run main coroutine
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
-loop.close()
+asyncio.run(main())
diff --git a/examples/simple.py b/examples/simple.py
index a8c904bc3d3755fb4fba1eb6732f9d9df8404b99..2d5d74c5e848e5a03229717d467d4652ad66091c 100644
--- a/examples/simple.py
+++ b/examples/simple.py
@@ -1,21 +1,28 @@
 import asyncio
-from aiostream import stream, pipe
 
+from aiostream import pipe, stream
 
-async def main():
+
+def is_odd(x: int) -> bool:
+    return x % 2 == 1
+
+
+def square(x: int, *_: object) -> int:
+    return x**2
+
+
+async def main() -> None:
     # This stream computes 11² + 13² in 1.5 second
     xs = (
         stream.count(interval=0.1)  # Count from zero every 0.1 s
         | pipe.skip(10)  # Skip the first 10 numbers
         | pipe.take(5)  # Take the following 5
-        | pipe.filter(lambda x: x % 2)  # Keep odd numbers
-        | pipe.map(lambda x: x ** 2)  # Square the results
+        | pipe.filter(is_odd)  # Keep odd numbers
+        | pipe.map(square)  # Square the results
         | pipe.accumulate()  # Add the numbers together
     )
     print("11² + 13² = ", await xs)
 
 
 # Run main coroutine
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
-loop.close()
+asyncio.run(main())
diff --git a/setup.cfg b/setup.cfg
index 04a81f95ed97fdae3774417ec42c323454e2fa99..e9876efec7176e24432b4cc9d35de0de80f8bd19 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,9 +1,22 @@
 [tool:pytest]
 addopts = tests --strict-markers --cov aiostream --cov-report html --cov-report term
 
+[coverage:report]
+exclude_lines =
+    pragma: no cover
+    if TYPE_CHECKING:
+    ...
+
 [aliases]
 test = pytest
 
 [flake8]
 max-line-length = 88
-ignore = F401, F403, E731, W503
+ignore = F401, F403, E731, W503, E501, E203
+
+[mypy]
+strict = True
+packages = aiostream, examples
+
+[mypy-aiostream.test_utils]
+ignore_errors = True
diff --git a/setup.py b/setup.py
index f5a258ad59a790864620717645fb3529bb44115e..656be74218a6be170a580c67619e8343b1eb4c3d 100644
--- a/setup.py
+++ b/setup.py
@@ -1,22 +1,24 @@
 import sys
+
 from setuptools import setup
 
 TESTING = any(x in sys.argv for x in ["test", "pytest"])
 UPLOAD = "upload_sphinx" in sys.argv
 
-if not UPLOAD and sys.version_info < (3, 6):
-    raise RuntimeError("aiostream requires Python 3.6")
-
 with open("README.rst", encoding="utf-8") as f:
     long_description = f.read()
 
 setup(
     name="aiostream",
-    version="0.4.3",
+    version="0.5.2",
     packages=["aiostream", "aiostream.stream"],
+    python_requires=">=3.8",
+    install_requires=["typing-extensions"],
     setup_requires=["pytest-runner" if TESTING else ""],
-    install_requires=['async_exit_stack;python_version<"3.7"'],
     tests_require=["pytest", "pytest-asyncio", "pytest-cov"],
+    package_data={
+        "aiostream": ["py.typed"],
+    },
     description="Generator-based operators for asynchronous iteration",
     long_description=long_description,
     url="https://github.com/vxgmichel/aiostream",
@@ -24,10 +26,9 @@ setup(
     classifiers=[
         "Programming Language :: Python",
         "Programming Language :: Python :: 3",
-        "Programming Language :: Python :: 3.6",
-        "Programming Language :: Python :: 3.7",
         "Programming Language :: Python :: 3.8",
         "Programming Language :: Python :: 3.9",
+        "Programming Language :: Python :: 3.10",
     ],
     author="Vincent Michel",
     author_email="vxgmichel@gmail.com",
diff --git a/test-requirements.txt b/test-requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..19f878e17aef414a52b4abe82d7bf5f3a32a1e16
--- /dev/null
+++ b/test-requirements.txt
@@ -0,0 +1,3 @@
+pytest
+pytest-asyncio
+pytest-cov
diff --git a/tests/test_advanced.py b/tests/test_advanced.py
index f171418644a9cc962cdd661cecbefb0e98dbca66..056354560618d29f287705a96f0304a8ddc14ed2 100644
--- a/tests/test_advanced.py
+++ b/tests/test_advanced.py
@@ -86,7 +86,6 @@ async def test_flatmap(assert_run, event_loop):
 
 @pytest.mark.asyncio
 async def test_switchmap(assert_run, event_loop):
-
     with event_loop.assert_cleanup():
         xs = stream.range(0, 30, 10, interval=3)
         ys = xs | pipe.switchmap(lambda x: stream.range(x, x + 5, interval=1))
diff --git a/tests/test_aiter.py b/tests/test_aiter.py
index 2df4d7c5507aede6d018b700c54b2199daf97109..319a2b21f7baa4bc05b4a0ea2ad4d6e059cdd0e3 100644
--- a/tests/test_aiter.py
+++ b/tests/test_aiter.py
@@ -56,9 +56,7 @@ class not_an_agen(list):
 
 @pytest.mark.asyncio
 async def test_simple_aitercontext(event_loop):
-
     async with aitercontext(agen()) as safe_gen:
-
         # Cannot enter twice
         with pytest.raises(RuntimeError):
             async with safe_gen:
diff --git a/tests/test_combine.py b/tests/test_combine.py
index 9df0f741b87e0c23d546ae08940fcb6852ef88e6..c21afda25110f06ef4be77e119e227af952535e2 100644
--- a/tests/test_combine.py
+++ b/tests/test_combine.py
@@ -30,11 +30,10 @@ async def test_zip(assert_run, event_loop):
 
 @pytest.mark.asyncio
 async def test_map(assert_run, event_loop):
-
     # Synchronous/simple
     with event_loop.assert_cleanup():
-        xs = stream.range(5) | pipe.map(lambda x: x ** 2)
-        expected = [x ** 2 for x in range(5)]
+        xs = stream.range(5) | pipe.map(lambda x: x**2)
+        expected = [x**2 for x in range(5)]
         await assert_run(xs, expected)
 
     # Synchronous/multiple
diff --git a/tests/test_core.py b/tests/test_core.py
index 1ebe1a816efee53b3b8c62fb5020d6e5400144bb..a97a35e53c35279f3ee2468aeb24bd5f51b8dec9 100644
--- a/tests/test_core.py
+++ b/tests/test_core.py
@@ -9,7 +9,6 @@ event_loop
 
 @pytest.mark.asyncio
 async def test_streamcontext(event_loop):
-
     with event_loop.assert_cleanup():
         xs = stream.range(3) | add_resource.pipe(1)
         async with streamcontext(xs) as streamer:
@@ -28,7 +27,6 @@ async def test_streamcontext(event_loop):
 
 
 def test_operator_from_method():
-
     with pytest.raises(ValueError):
 
         class A:
@@ -43,7 +41,7 @@ def test_operator_from_method():
             async def method(cls, arg):
                 yield 1
 
-    with pytest.raises(AttributeError):
+    with pytest.raises(ValueError):
 
         class C:
             @operator
@@ -78,3 +76,39 @@ async def test_error_on_entering_a_stream(event_loop):
             assert False
 
     assert "Use the `stream` method" in str(ctx.value)
+
+
+def test_compatibility():
+    @operator
+    async def test1():
+        yield 1
+
+    with pytest.raises(AttributeError):
+        test1.pipe
+
+    match = "The `pipable` argument is deprecated."
+    with pytest.warns(DeprecationWarning, match=match):
+
+        @operator()
+        async def test2():
+            yield 1
+
+    with pytest.raises(AttributeError):
+        test2.pipe
+
+    with pytest.warns(DeprecationWarning, match=match):
+
+        @operator(pipable=False)
+        async def test3():
+            yield 1
+
+    with pytest.raises(AttributeError):
+        test3.pipe
+
+    with pytest.warns(DeprecationWarning, match=match):
+
+        @operator(pipable=True)
+        async def test4(source):
+            yield 1
+
+    test4.pipe
diff --git a/tests/test_create.py b/tests/test_create.py
index d1db034e5e0ef3f5e97d5b185d65f436dfa3b9dd..3c6724d3008be43e86c2d0fea10e6fcf03af05f0 100644
--- a/tests/test_create.py
+++ b/tests/test_create.py
@@ -92,7 +92,7 @@ async def test_iterable(assert_run):
 async def test_async_iterable(assert_run, event_loop):
     async def agen():
         for x in range(2, 5):
-            yield await asyncio.sleep(1.0, result=x ** 2)
+            yield await asyncio.sleep(1.0, result=x**2)
 
     xs = stream.create.from_async_iterable(agen())
     await assert_run(xs, [4, 9, 16])
diff --git a/tests/test_pipe.py b/tests/test_pipe.py
new file mode 100644
index 0000000000000000000000000000000000000000..66f3596ba0c0cde8ecc57f2660ca8f0914919855
--- /dev/null
+++ b/tests/test_pipe.py
@@ -0,0 +1,10 @@
+from aiostream import stream, pipe
+
+
+def test_pipe_module():
+    for name in dir(stream):
+        obj = getattr(stream, name)
+        pipe_method = getattr(obj, "pipe", None)
+        if pipe_method is None:
+            continue
+        assert getattr(pipe, name) == pipe_method