The current package is just a wrapper for FastStream objects to make them compatible with Taskiq library.
The main goal of it - provide FastStream with a great Taskiq tasks scheduling feature.
If you already have FastStream project to interact with your Message Broker, you can add scheduling to it by installing just a taskiq-faststream
pip install taskiq-faststreamIf you starting with a clear project, you can specify taskiq-faststream broker by the following distributions:
pip install taskiq-faststream[rabbit] # or pip install taskiq-faststream[kafka] # or pip install taskiq-faststream[confluent] # or pip install taskiq-faststream[nats] # or pip install taskiq-faststream[redis]The package gives you two classes: AppWrapper and BrokerWrapper
These are just containers for the related FastStream objects to make them taskiq-compatible
To create scheduling tasks for your broker, just wrap it to BrokerWrapper and use it like a regular taskiq Broker.
# regular FastStream code from faststream.nats import NatsBroker broker = NatsBroker() @broker.subscriber("test-subject") async def handler(msg: str): print(msg) # taskiq-faststream scheduling from taskiq.schedule_sources import LabelScheduleSource from taskiq_faststream import BrokerWrapper, StreamScheduler # wrap FastStream object taskiq_broker = BrokerWrapper(broker) # create periodic task taskiq_broker.task( message="Hi!", # If you are using RabbitBroker, then you need to replace subject with queue. # If you are using KafkaBroker, then you need to replace subject with topic. subject="test-subject", schedule=[{ "cron": "* * * * *", }], ) # create scheduler object scheduler = StreamScheduler( broker=taskiq_broker, sources=[LabelScheduleSource(taskiq_broker)], )To run the scheduler, just use the following command
taskiq scheduler module:schedulerAlso, you can wrap your FastStream application the same way (allows to use lifespan events and AsyncAPI documentation):
# regular FastStream code from faststream import FastStream from faststream.nats import NatsBroker broker = NatsBroker() app = FastStream(broker) @broker.subscriber("test-subject") async def handler(msg: str): print(msg) # wrap FastStream object from taskiq_faststream import AppWrapper taskiq_broker = AppWrapper(app) # Code below omitted 👇A little feature: instead of using a final message argument, you can set a message callback to collect information right before sending:
async def collect_information_to_send(): return "Message to send" taskiq_broker.task( message=collect_information_to_send, ..., )Also, you can send a multiple message by one task call just using generator message callback with yield
async def collect_information_to_send(): """Sends 10 messages per task call.""" for i in range(10): yield i taskiq_broker.task( message=collect_information_to_send, ..., )