Tasks are processes meant to execute a one-off piece of code, generally with no communication with other processes and return a result. They’re perfect for concurrent, isolated computations.
Here’s a taste of Tasks in action:
# Simple async computation
task = Task.async(fn ->
:timer.sleep(1000)
"Done!"
end)
result = Task.await(task) # Waits for result: "Done!"
# Multiple concurrent tasks
results = 1..3
|> Enum.map(fn x ->
Task.async(fn -> x * 2 end)
end)
|> Enum.map(&Task.await/1) # [2, 4, 6]
While you could use spawn
for one-off processes, Tasks offer several advantages:
✅ Benefits of Tasks:
# Using spawn (basic process)
pid = spawn(fn ->
result = expensive_operation()
send(caller, result)
end)
# Using Task (better approach)
task = Task.async(fn -> expensive_operation() end)
result = Task.await(task, :timer.seconds(5)) # With timeout
Perfect for parallel computations where you need the results:
# Process multiple items concurrently
def process_items(items) do
items
|> Enum.map(&Task.async(fn -> process_item(&1) end))
|> Enum.map(&Task.await/1)
end
# Real-world example
urls
|> Enum.map(&Task.async(fn -> HTTPoison.get!(&1) end))
|> Enum.map(&Task.await(&1, :timer.seconds(5)))
When you don’t need the result:
# Start task without waiting
Task.start(fn ->
Logger.info("Processing in background...")
expensive_operation()
end)
# Supervised fire-and-forget
Task.Supervisor.start_child(MySupervisor, fn ->
background_job()
end)
For better error handling and restart strategies:
# In your application supervisor
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
# Starting supervised tasks
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
might_fail()
end)
# Using Task.yield to handle timeouts gracefully
task = Task.async(fn ->
:timer.sleep(2000)
{:ok, "Slow operation complete"}
end)
case Task.yield(task, 1000) do
nil ->
Task.shutdown(task)
{:error, :timeout}
{:ok, result} ->
result
end
defmodule ParallelProcessor do
def parallel_map(collection, func, chunk_size \\ 1000) do
collection
|> Enum.chunk_every(chunk_size)
|> Enum.map(fn chunk ->
Task.async(fn ->
Enum.map(chunk, func)
end)
end)
|> Task.await_many(:timer.seconds(30))
|> List.flatten()
end
end
# Usage example:
1..10_000
|> ParallelProcessor.parallel_map(&expensive_operation/1)
defmodule TaskPipeline do
def process_data(input) do
input
|> Task.async(fn -> stage_1() end)
|> Task.await()
|> then(&Task.async(fn -> stage_2(&1) end))
|> Task.await()
|> then(&Task.async(fn -> stage_3(&1) end))
|> Task.await()
end
defp stage_1(data), do: # ... implementation
defp stage_2(data), do: # ... implementation
defp stage_3(data), do: # ... implementation
end
# Parallel API requests with rate limiting
defmodule ApiClient do
def fetch_all_users(user_ids) do
user_ids
|> Stream.chunk_every(10) # Process 10 at a time
|> Stream.map(fn chunk ->
# Start tasks for each chunk
chunk
|> Enum.map(&Task.async(fn -> fetch_user(&1) end))
|> Task.await_many(:timer.seconds(5))
end)
|> Enum.to_list()
|> List.flatten()
end
defp fetch_user(id) do
:timer.sleep(100) # Simulate API call
%{id: id, name: "User #{id}"}
end
end
# Background job processing with error handling
defmodule BackgroundJob do
def process_async(jobs) do
supervisor = MyApp.TaskSupervisor
jobs
|> Enum.map(fn job ->
Task.Supervisor.async_nolink(supervisor, fn ->
try do
process_job(job)
rescue
e ->
Logger.error("Job failed: #{inspect(e)}")
{:error, job, e}
end
end)
end)
|> Enum.map(fn task ->
case Task.yield(task, :timer.seconds(10)) || Task.shutdown(task) do
{:ok, result} -> result
nil -> {:error, :timeout}
end
end)
end
end
# Example of controlling concurrency
defmodule ConcurrencyControl do
@max_concurrency System.schedulers_online() * 2
def parallel_process(items) do
items
|> Task.async_stream(
&process_item/1,
max_concurrency: @max_concurrency,
timeout: 5000
)
|> Enum.to_list()
end
end
✅ Do:
Task.await
Task.yield_many
for handling multiple tasks with timeouts❌ Don’t:
Task.await
Task.yield
for more control over timeout handlingTask.async_nolink
to prevent error propagation✅ Great for:
❌ Consider alternatives for:
Try this simple example in iex
:
# Start an async computation
iex> task = Task.async(fn ->
:timer.sleep(1000)
"Hello from task!"
end)
# Do other work while task runs...
iex> "Working..."
# Get the result
iex> Task.await(task)
"Hello from task!"