Reactive event bus for PHP powered by RxPHP and Swoole.
It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.
- EventBus — simple Rx‐backed bus with
on()
,onMany()
,payloads()
,once()
,request()
- SwooleScheduler —
AsyncSchedulerInterface
usingSwoole\Timer
(works with RxPHP time operators) - Event model —
BasicEvent
(name, payload, meta, rid) andEventInterface
(correlation id)
Requirements
- PHP 8.3+
-
ext-swoole
4.8+ / 5.x -
reactivex/rxphp
(2.x)
Installation
composer require small/swoole-rx-events
Quick start
use Small\SwooleRxEvents\EventBus; use Small\SwooleRxEvents\SwooleScheduler; use Small\SwooleRxEvents\Event\BasicEvent; // Use the Swoole async scheduler $bus = new EventBus(new SwooleScheduler()); // Subscribe by name $bus->on('order.created')->subscribe(function ($e) { echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL; }); // Emit an event $bus->emitName('order.created', ['id' => 123]); // If you’re in a plain CLI script, keep the loop alive briefly: \Swoole\Timer::after(20, fn () => \Swoole\Event::exit()); \Swoole\Event::wait();
Concepts
Event
All event must implement EventInterface
namespace Small\SwooleRxEvents\Contract; interface EventInterface { public function getName(): string; public function getRid(): string; public function setRid(string $rid): self; }
BasicEvent
carries:
-
name
(string) -
payload
(array) -
meta
(array, e.g. tracing, user) -
rid
(string, auto‐generated correlation id)
Bus
-
stream()
— all events -
on($name)
/onMany([...])
— filtered streams -
payloads($name)
— payload‐only stream -
once($name, ?map, ?timeoutMs)
— resolve first matching event (optionally mapped) -
request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
Emits a request with a newrid
, waits for the first response with the samerid
.
Timeouts require an async scheduler. This library provides
SwooleScheduler
which implementsAsyncSchedulerInterface
.
API Examples
1) Listen & emit
$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload())); $bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);
2) Request/Response with correlation id
// Responder: copies rid from incoming 'REQ' and emits 'RESP' $bus->on('REQ')->subscribe(function ($e) use ($bus) { $bus->emit( (new BasicEvent('RESP', ['ok' => true], $e->getMeta())) ->setRid($e->getRid()) // correlate ); }); // Caller: request() subscribes FIRST, then emits; no race conditions $bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100) ->subscribe( fn($resp) => var_dump($resp->getPayload()), // ['ok' => true] fn($err) => error_log($err->getMessage()) );
3) once()
with mapping & timeout
$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50) ->subscribe( fn($node) => echo "node=$node\n", fn($err) => echo "timeout\n" ); $bus->emitName('health.ok', [], ['node' => 'api-1']);
4) Backpressure / batching (Rx composition)
$bus->on('order.created') ->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items ->filter(fn($batch) => !empty($batch)) ->subscribe(fn(array $batch) => persist_batch($batch));
Swoole integration tips
- HTTP server: in
on('request')
, emit an event with meta containing arespond
callable or theResponse
object. Downstream subscribers can produce aResponseEvent
. - Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
- Event loop in CLI: outside a Swoole
Server
, start/stop the reactor withSwoole\Event::wait()
/Event::exit()
for timers to fire.
Top comments (0)