DEV Community

Cover image for Elixir Pubsub In Less Than 50 Lines
Alex de Sousa
Alex de Sousa

Posted on • Originally published at thebroken.link

Elixir Pubsub In Less Than 50 Lines

:pg2 is a mostly unknown, but powerful Erlang module. It provides an API for creating process groups.

Process Group

So, what's a process group? Well... it's a group of Erlang/Elixir processes.

Perhaps, the correct question would be, why do we care about process groups? Well, process groups are the foundation for publisher-subscribers (pubsubs for short).

PG2

Understanding :pg2 API and how it relates to a pubsub API will make it easier to understand:

  • Every process group is a channel e.g. a group called :my_channel is created:
 iex> :pg2.create(:my_channel) :ok 
  • Every process in a group is a subscriber e.g. self() is part of :my_channel group:
 iex> :pg2.join(:my_channel, self()) :ok 
  • A publisher can send/2 messages to a channel e.g. the publisher gets all the members of the group :my_channel and sends "Some message":
 iex> members = :pg2.get_members(:my_channel) :ok iex> for member <- members, do: send(member, "Some message") 
  • A subscriber will receive the messages in its mailbox:
 iex> flush() "Some message" :ok 
  • A subscriber can unsubscribe from a channel e.g. self() leaves the group :my_channel:
 iex> :pg2.leave(:my_channel, self()) :ok 
  • A channel can be deleted:
 iex> :pg2.delete(:my_channel) :ok 

And that's it! That's the API. And you know what's the best thing about it? It can work between connected nodes. Keep reading and you'll see :)

Message in cereal

Implementing a PubSub

A PubSub has three main functions:

  • subscribe/1 for subscribing to a channel:
 def subscribe(channel) do pid = self() case :pg2.get_members(channel) do members when is_list(members) -> if pid in members do :ok # It's already subscribed. else :pg2.join(channel, pid) # Subscribes to channel end {:error, {:no_such_group, ^channel}} -> :pg2.create(channel) # Creates channel :pg2.join(channel, pid) # Subscribe to channel end end 
  • unsubscribe/1 for unsubscribing from a channel.
 def unsubscribe(channel) do pid = self() case :pg2.get_members(channel) do [^pid] -> :pg2.leave(channel, pid) # Unsubscribes from channel :pg2.delete(channel) # Deletes the channel members when is_list(members) -> if pid in members do :pg2.leave(channel, pid) # Unsubscribes from channel else :ok # It's already unsubscribed end _ -> :ok end end 
  • publish/2 for sending a message to a channel.
 def publish(channel, message) do case :pg2.get_members(channel) do [_ | _] = members -> for member <- members, do: send(member, message) :ok _ -> :ok end end 

For a full implementation of PubSub you can check this gist.

I usually create a .iex.exs file in my $HOME folder and then run iex. You could do the same with the previous gist by doing the following:

~ $ PUBSUB="https://gist.githubusercontent.com/alexdesousa/4d592fe206cca17393affaefa4c8fd33/raw/4d84894f016bd9eef84bba647c77c62b9c9a6094/pub_sub.ex" ~ $ curl "$PUBSUB" -o .iex.exs ~ $ iex 

It's that easy

Distributed PubSub

For our distributed experiment we'll need two nodes. My machine is called matrix and both nodes will be neo and trinity respectively:

  • :neo@matrix:
 alex@matrix ~ $ iex --sname neo iex(neo@matrix)1> 
  • :trinity@matrix:
 alex@matrix ~ $ iex --sname trinity iex(trinity@matrix)1> Node.connect(:neo@matrix) # Connects both nodes 

Now :neo@matrix can subscribe to :mainframe channel:

iex(neo@matrix)1> PubSub.subscribe(:mainframe) :ok 

And :trinity@matrix can send a message:

iex(trinity@matrix)2> PubSub.publish(:mainframe, "Wake up, Neo...") :ok 

Note: Sometimes it takes a bit of time for nodes to synchronize their process groups, so you might need to publish/2 your message twice.

Finally, :neo@matrix should receive the message:

iex(neo@matrix)2> flush() "Wake up, Neo..." :ok 

And that's it. A powerful pubsub in a few lines of code thanks to :pg2.

Follow the white rabbit

Conclusion

Erlang has several built-in hidden gems like :pg2 that make our lives easier.

gem

Happy coding!

Cover image by Nicolas Picard

Top comments (0)