Distributed sorting with Elixir genservers
Introduction
In this post I am going to discuss how to implement a distributed sorting algorithm using Elixir genservers. The algorithm is based on the quicksort algorithm, which is a comparison sort algorithm that uses a divide-and-conquer strategy to sort an array. The algorithm is implemented in a distributed way, where each process is responsible for sorting a subset of the array. The processes communicate with each other to exchange information about the sorted subsets and to merge them into a final sorted array.
Disclaimer: This is probably not the best use of genservers and could have much easily been implemented using other Elixir abstractions such as tasks. However, since I wanted to learn how to use Genservers, I decided to implement the algorithm using them.
You can find the full code for this implementation in this Github repository.
The algorithm works as follows:
- The main process receives a list to be sorted and the number of processes to be used.
- The main process creates a master process that will be responsible for coordinating the sorting process.
- The master process divides the array into
x
smaller subsets, wherex
is the result of the array being divided by a constant numberk
. - The master process creates
y
genservers, each responsible for sorting a subset of the array. - The master process sends the subsets to the genservers.
- Each genserver sorts its subset using the quicksort algorithm.
- Each genserver sends the sorted subset back to the main process.
- At each 2 received sorted arrays, the master process sends them to a genserver that merges them into a single sorted subset and returns it back to the master process.
- The process is repeated until there is only one sorted subset left with the same size of the original list.
Elixir implementation
Mix
The project was created via mix
which is a build tool that provides tasks for creating, compiling, testing your application, managing its dependencies and much more.
In the project definition we define the application to come from our DistributedSorting
module:
def application do
app = [
extra_applications: [:logger]
# mod: {DistributedSorting, []} # Only if not Mix.env() == :test
]
if Mix.env() != :test do
app ++ [mod: {DistributedSorting, []}]
else
app
end
end
Run the application as mix run -- --file yourfilepath.txt --workers 100
Application behaviour
This module uses the Application
behaviour and is responsible for starting and stopping the application. It is also responsible for starting the main process that will coordinate the sorting process.
defmodule DistributedSorting do
use Application
def read_file_to_sort(file_path) do
{:ok, content} = File.read(file_path)
content
|> String.split("\n")
|> Enum.map(&String.to_integer/1)
end
defp parse_args(args) do
{parsed, _, _} = OptionParser.parse(args, strict: [file: :string, workers: :integer])
input_file = parsed[:file]
n_workers = parsed[:workers]
{input_file, n_workers}
end
def start(_start_type, _start_args) do
args = System.argv()
{input_file, n_workers} = parse_args(args)
array_to_sort = read_file_to_sort(input_file)
{:ok, pid} = DistributedMaster.start_link({array_to_sort, n_workers})
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, _object, _reason} ->
IO.puts("Master process terminated")
end
{:ok, pid}
end
end
The module starts the application by reading a input file whose path is passed as an argument. The file contains a list of integers to be sorted in the following format:
99022
20655
-63097
88111
4412
3499
42423
91401
61189
-88446
37496
-71973
21591
-27418
-42662
95704
68402
-45611
-89018
(...)
Genservers
The module then starts the main process DistributedMaster
that will coordinate the sorting process.
defmodule DistributedMaster do
@moduledoc """
Distributed Master module.
This module implements a GenServer that starts worker processes and sends lists to be sorted.
It initally calls the worker processes to sort the lists and then merges the sorted lists.
It implements 1 different call message:
- {:add_sorted_list, list} - Adds a sorted list to the list of sorted lists and merges two lists when two lists are sorted.
Once it finishes it will write the sorted list to a file and terminate the worker processes. The file will be named sorted_integers.txt and will contain the sorted integers.
"""
use GenServer
@impl true
def init({array_to_sort, n_workers}) do
IO.puts("Master started with pid #{inspect(self())} \n")
n_elements = length(array_to_sort)
sorted_lists = []
worker_pids = start_worker_processes(n_workers)
start_initial_sorting(worker_pids, array_to_sort)
{:ok, {sorted_lists, n_elements, worker_pids}}
end
def start_initial_sorting(available_workers, array_to_sort) do
chunk_size = 500
to_sort_lists = Enum.chunk_every(array_to_sort, chunk_size, chunk_size)
worker_cycle = available_workers |> Stream.cycle()
Enum.zip(worker_cycle, to_sort_lists)
|> Enum.each(fn {worker_pid, list} -> DistributedWorker.sort(worker_pid, list) end)
end
def start_worker_processes(n_workers) do
1..n_workers
|> Enum.map(fn _ -> self() |> DistributedWorker.start_link() end)
|> Enum.map(fn {:ok, pid} -> pid end)
end
@impl true
def handle_call({:add_sorted_list, list}, from, state) do
{sorted_lists, n_elements, worker_pids} = state
sorted_lists = sorted_lists ++ [list]
already_sorted_lists = length(sorted_lists)
length_of_current_list = length(list)
case {already_sorted_lists, length_of_current_list} do
{1, ^n_elements} ->
IO.puts("All elements sorted\n")
to_write = list |> Enum.join("\n")
File.write("sorted_integers.txt", to_write, [:write])
sorted_lists = []
{:stop, :normal, :ok, {sorted_lists, n_elements, worker_pids}}
{2, _} ->
IO.puts("Two lists sorted -> will call merge_sorted\n")
{worker_pid, _} = from
[first, second] = sorted_lists
sorted_lists = []
worker_pid |> DistributedWorker.merge_sorted({first, second})
{:reply, sorted_lists, {sorted_lists, n_elements, worker_pids}}
{_, _} ->
{:reply, sorted_lists, {sorted_lists, n_elements, worker_pids}}
end
end
@impl true
def terminate(_reason, state) do
IO.puts("Terminating DistributedMaster\n")
{_, _, worker_pids} = state
worker_pids |> Enum.each(fn pid -> DistributedWorker.finish(pid) end)
end
def add_sorted_list(pid, list) do
GenServer.call(pid, {:add_sorted_list, list})
end
def start_link({array_to_sort, n_workers}) do
GenServer.start_link(__MODULE__, {array_to_sort, n_workers})
end
end
The DistributedMaster
module is a Genserver that starts worker processes and sends lists to be sorted. It initially calls the worker processes to sort the lists and then merges the sorted lists. It implements 1 different call message:
-
{:add_sorted_list, list}
- Adds a sorted list to the list of sorted lists and calls for merging two lists when two lists were returned. Finally exits when one list of the size of the original is remaining.
Elixir genservers are lightweight processes that are used to manage state and handle messages. They are implemented using the GenServer
module, which provides a set of callbacks that define the behaviour of the genserver. The init
callback is called when the genserver is started and is responsible for initializing the genserver’s state. The handle_call
callback is called when a message is sent to the genserver using the GenServer.call
function. The terminate
callback is called when the genserver is terminated and is responsible for cleaning up any resources used by the genserver.
Processes receive messages in their mailbox
which is a queue where messages are stored until they are processed. The GenServer.call
function is used to send a message to a genserver and wait for a reply. The GenServer.reply
function is used to send a reply to a message that was received by a genserver. The GenServer.cast
function is used to send a message to a genserver without waiting for a reply.
In our case the distributed master process handles syncronous calls to add sorted lists and through pattern matching decides what to do with the sorted lists:
- When two lists are sorted it casts a messsage
message_sorted
to theDistributedWorker.merge_sorted
function to merge the two lists. - When all lists are sorted it writes the sorted list to a file and returns
:stop
to terminate the genserver. If this is the case it also terminates all the child worker processes on theterminate
callback.
The DistributedWorker
module is a genserver that sorts a list of integers using the quicksort algorithm. The module implements 2 different cast messages:
-
{:sort, list}
- Sorts a list of integers using the quicksort algorithm. -
{:merge_sorted, {list1, list2}}
- Merges two sorted lists into a single sorted list.
defmodule DistributedWorker do
@moduledoc """
Distributed Worker module
This module implements a GenServer that sorts a list of integers and merges two sorted lists.
It implements 3 different cast messages:
- {:sort, caller_pid, list} - Sorts the list and sends the sorted list to the caller_pid
- {:merge_sorted_lists, caller_pid, list1, list2} - Merges the two sorted lists and sends the merged list to the caller_pid
- :finish - Stops the GenServer
"""
import Sorter, only: [merge_sorted_lists: 3, sort: 1]
use GenServer
def start_link(master_pid) do
GenServer.start_link(__MODULE__, master_pid)
end
@impl true
def init(master_pid) do
IO.puts("Worker started with PID #{inspect(self())}\n")
{:ok, master_pid}
end
@impl true
def handle_cast({:sort, list}, state) do
IO.puts("State #{inspect(state)}\n")
master_pid = state
sorted = sort(list)
master_pid |> DistributedMaster.add_sorted_list(sorted)
{:noreply, master_pid}
end
@impl true
def handle_cast({:merge_sorted_lists, list1, list2}, state) do
master_pid = state
sorted = merge_sorted_lists([], list1, list2)
master_pid |> DistributedMaster.add_sorted_list(sorted)
{:noreply, master_pid}
end
@impl true
def handle_cast(:finish, _state) do
IO.puts("Stopping Worker\n")
{:stop, :normal, %{}}
end
@impl true
def terminate(_reason, _state) do
IO.puts("Terminating Worker\n")
end
def finish(pid) do
GenServer.cast(pid, :finish)
end
def sort(pid, list) do
GenServer.cast(pid, {:sort, list})
end
def merge_sorted(pid, {list1, list2}) do
GenServer.cast(pid, {:merge_sorted_lists, list1, list2})
end
end
Notice that the DistributedWorker starts with a master_pid
that is the pid of the DistributedMaster
genserver. This is used to send messages to the master process to add the sorted lists. The DistributedWorker
module is a genserver that sorts a list of integers using the quicksort algorithm. The module implements 2 different cast messages:
The process id is stored in the state via the init
callback.
Sorter module logic
The worker module imports the Sorter
module merge_sorted_lists/3
and sort/1
functions.
defmodule Sorter do
@moduledoc """
Module for sorting lists.
It contains functionality for merging two sorted lists and sorting a list.
Merging sorted lists complexity is O(m + n) where m and n are the lengths of the lists.
Sorting a list complexity is O(n * log(n)) where n is the length of the list.
"""
def merge_sorted_lists(sorted_acc, [], []) do
sorted_acc
end
def merge_sorted_lists(sorted_acc, first, []) do
sorted_acc ++ first
end
def merge_sorted_lists(sorted_acc, [], second) do
sorted_acc ++ second
end
def merge_sorted_lists(sorted_acc, first, second) do
[lhead | ltail] = first
[rhead | rtail] = second
{sorted_acc, first, second} =
if lhead <= rhead do
{sorted_acc ++ [lhead], ltail, second}
else
{sorted_acc ++ [rhead], first, rtail}
end
merge_sorted_lists(sorted_acc, first, second)
end
def sort(list) do
Enum.sort(list)
end
end
Notice the pattern-matching and recursion usage of the merge_sorted_lists
function calls instead of explicit if-else statements that are checked
during application runtime.
Instead we define a function for each possible case, to be efficiently compiled and let the BEAM VM decide which function to call at runtime.