Skip to content

taskiq-python/taskiq-faststream

Repository files navigation

Taskiq - FastStream

Tests status Package version downloads Supported Python versions GitHub


The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.

The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.

Installation

If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream

pip install taskiq-faststream

If you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:

pip install taskiq-faststream[rabbit] # or pip install taskiq-faststream[kafka] # or pip install taskiq-faststream[confluent] # or pip install taskiq-faststream[nats] # or pip install taskiq-faststream[redis]

Usage

The package gives you two classes: AppWrapper and BrokerWrapper

These are just containers for the related FastStream objects to make them taskiq-compatible

To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.

# regular FastStream code from faststream.nats import NatsBroker broker = NatsBroker() @broker.subscriber("test-subject") async def handler(msg: str): print(msg) # taskiq-faststream scheduling from taskiq.schedule_sources import LabelScheduleSource from taskiq_faststream import BrokerWrapper, StreamScheduler # wrap FastStream object taskiq_broker = BrokerWrapper(broker) # create periodic task taskiq_broker.task( message="Hi!", # If you are using RabbitBroker, then you need to replace subject with queue. # If you are using KafkaBroker, then you need to replace subject with topic. subject="test-subject", schedule=[{ "cron": "* * * * *", }], ) # create scheduler object scheduler = StreamScheduler( broker=taskiq_broker, sources=[LabelScheduleSource(taskiq_broker)], )

To run the scheduler, just use the following command

taskiq scheduler module:scheduler

Also, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):

# regular FastStream code from faststream import FastStream from faststream.nats import NatsBroker broker = NatsBroker() app = FastStream(broker) @broker.subscriber("test-subject") async def handler(msg: str): print(msg) # wrap FastStream object from taskiq_faststream import AppWrapper taskiq_broker = AppWrapper(app) # Code below omitted 👇

A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:

async def collect_information_to_send(): return "Message to send" taskiq_broker.task( message=collect_information_to_send, ..., )

Also, you can send a multiple message by one task call just using generator message callback with yield

async def collect_information_to_send(): """Sends 10 messages per task call.""" for i in range(10): yield i taskiq_broker.task( message=collect_information_to_send, ..., )