:pg2 is a mostly unknown, but powerful Erlang module. It provides an API for creating process groups.
Process Group
So, what's a process group? Well... it's a group of Erlang/Elixir processes.
Perhaps, the correct question would be, why do we care about process groups? Well, process groups are the foundation for publisher-subscribers (pubsubs for short).
PG2
Understanding :pg2 API and how it relates to a pubsub API will make it easier to understand:
- Every process group is a
channele.g. a group called:my_channelis created:
iex> :pg2.create(:my_channel) :ok - Every process in a group is a
subscribere.g.self()is part of:my_channelgroup:
iex> :pg2.join(:my_channel, self()) :ok - A
publishercansend/2messages to achannele.g. the publisher gets all the members of the group:my_channeland sends"Some message":
iex> members = :pg2.get_members(:my_channel) :ok iex> for member <- members, do: send(member, "Some message") - A
subscriberwill receive the messages in its mailbox:
iex> flush() "Some message" :ok - A
subscribercan unsubscribe from achannele.g.self()leaves the group:my_channel:
iex> :pg2.leave(:my_channel, self()) :ok - A
channelcan be deleted:
iex> :pg2.delete(:my_channel) :ok And that's it! That's the API. And you know what's the best thing about it? It can work between connected nodes. Keep reading and you'll see :)
Implementing a PubSub
A PubSub has three main functions:
-
subscribe/1for subscribing to achannel:
def subscribe(channel) do pid = self() case :pg2.get_members(channel) do members when is_list(members) -> if pid in members do :ok # It's already subscribed. else :pg2.join(channel, pid) # Subscribes to channel end {:error, {:no_such_group, ^channel}} -> :pg2.create(channel) # Creates channel :pg2.join(channel, pid) # Subscribe to channel end end -
unsubscribe/1for unsubscribing from achannel.
def unsubscribe(channel) do pid = self() case :pg2.get_members(channel) do [^pid] -> :pg2.leave(channel, pid) # Unsubscribes from channel :pg2.delete(channel) # Deletes the channel members when is_list(members) -> if pid in members do :pg2.leave(channel, pid) # Unsubscribes from channel else :ok # It's already unsubscribed end _ -> :ok end end -
publish/2for sending amessageto achannel.
def publish(channel, message) do case :pg2.get_members(channel) do [_ | _] = members -> for member <- members, do: send(member, message) :ok _ -> :ok end end For a full implementation of PubSub you can check this gist.
I usually create a .iex.exs file in my $HOME folder and then run iex. You could do the same with the previous gist by doing the following:
~ $ PUBSUB="https://gist.githubusercontent.com/alexdesousa/4d592fe206cca17393affaefa4c8fd33/raw/4d84894f016bd9eef84bba647c77c62b9c9a6094/pub_sub.ex" ~ $ curl "$PUBSUB" -o .iex.exs ~ $ iex Distributed PubSub
For our distributed experiment we'll need two nodes. My machine is called matrix and both nodes will be neo and trinity respectively:
-
:neo@matrix:
alex@matrix ~ $ iex --sname neo iex(neo@matrix)1> -
:trinity@matrix:
alex@matrix ~ $ iex --sname trinity iex(trinity@matrix)1> Node.connect(:neo@matrix) # Connects both nodes Now :neo@matrix can subscribe to :mainframe channel:
iex(neo@matrix)1> PubSub.subscribe(:mainframe) :ok And :trinity@matrix can send a message:
iex(trinity@matrix)2> PubSub.publish(:mainframe, "Wake up, Neo...") :ok Note: Sometimes it takes a bit of time for nodes to synchronize their process groups, so you might need to
publish/2your message twice.
Finally, :neo@matrix should receive the message:
iex(neo@matrix)2> flush() "Wake up, Neo..." :ok And that's it. A powerful pubsub in a few lines of code thanks to :pg2.
Conclusion
Erlang has several built-in hidden gems like :pg2 that make our lives easier.
Happy coding!
Cover image by Nicolas Picard




Top comments (0)