Uma breve história...
Estava eu ajudando um colega com uma tarefa envolvendo tarefas assíncronas com golang. "Qual a melhor forma de fazer "x" coisa?". Nesse momento me vi diante de uma vastidão de possibilidades de resolver aquele problema. Como explicar então, de forma satisfatória, sem transformar a próxima meia hora num monólogo chato?
A ideia de compilar soluções
Havia lido tempos atrás essa solução que o autor também não atribuía a si, novidade na programação (só que não!). O artigo era o seguinte MULTIPLEXING CHANNELS IN GO, aconselho que leiam se quiserem aprofundar em alguns outros tópicos como canais.
A seguir apresentarei de forma bem direta a ideia de multiplexação.
O conceito:
Estamos falando da possibilidade de agregar um conjunto de canais à um único canal. Dessa forma podemos convergir os dados à um único ponto que será a retomada da sincronicidade do nosso código.
Um multiplexador (abreviação: MUX), por vezes denominado pelos anglicismos multiplexer ou multiplex, é um dispositivo que seleciona as informações de duas ou mais fontes de dados num único canal. São utilizados em situações onde o custo de implementação de canais separados para cada fonte de dados é maior que o custo e a inconveniência de utilizar as funções de multiplexação/demultiplexação. - Wikipedia. Repare nas imagens.
Vamos ao código
Seguiremos os seguintes passos:
- Criaremos uma função de espalhamento/divergência responsável por distribuir as Tasks para vários Workers.
- Criaremos a função do Worker que será responsável por executar a tarefa e encaminhar os resultados para o canal multiplexação (Result)
- Criaremos a Função de agregação responsável por receber os resultados e retornar ao fluxo síncrono da execução.
consideraremos para esse cenário as seguintes premissas:
// Tipo gerado para cada task poderíamos ter uma struct aqui type Task int // Tipo gerado para cada result poderíamos ter uma struct aqui type Result int // Função que representa um tempo de execução qualquer // para finalização da tarefa do worker func asyncSimulation(t Task) int { time.Sleep(1 * time.Second) return (int(t) * int(t)) }
0 - Nossa Função main:
func main() { // Trabalharemos com 10 tasks para serem completadas tasks := taskGenerator(1, 10) results := make(chan Result) // Esse wait group servirá para controlar o fechamento // do canal de result e evitar deadlocks wg := &sync.WaitGroup{} // 5 workers serão utilizados nesse exemplo for i := 0; i < 5; i++ { wg.Add(1) worker(tasks, results, wg) } // Verificamos o momento que todos os workers // terminarem de trabalhar verifyEnd(results, wg) // Recebemos todos os resultados no canal result // e imprimimos á medida que são resolvidos resultAggregator(results) }
1 - A função de espalhamento:
Essa é a função responsável por alimentar o canal de tasks com as tasks que serão executadas.
Como não definimos o tamanho do buffer do canal é necessário que a atribuição das tasks se faça numa goroutine para não gerarmos um deadlock.
func taskGenerator(start int, end int) <-chan Task { tasks := make(chan Task) go func() { for i := start; i < end; i++ { tasks <- Task(i) } close(tasks) }() return tasks }
2 - Trabalhando com mais de um Worker:
Para trabalharmos com mais de um worker precisamos também gerenciar o fechamento do canal de results, para isso definimos um wait group para os workers
func worker(tasks <-chan Task, result chan<- Result, wg *sync.WaitGroup) { go func() { for task := range tasks { result <- Result(asyncSimulation(task)) } wg.Done() }() } func verifyEnd(results chan<- Result, wg *sync.WaitGroup) { go func() { wg.Wait() close(results) }() }
3 - A função de agregação:
Utilizamos essa função para receber todos os dados repassados para o canal de results e posteriormente somamos e mostramos eles na tela
func resultAggregator(res <-chan Result) { sum := 0 totalResults := 0 for res := range res { fmt.Printf("received result %v\n", res) sum += int(res) totalResults += 1 } fmt.Printf("total os squares received: %d\n", totalResults) fmt.Printf("sum of squares: %d", sum) }
Análise do resultado
// Após 1 segundo recebemos os primeiros 5 resultados // tempo gasto pelos 5 workers para processar as primeiras // 5 tasks received result 16 received result 9 received result 4 received result 0 received result 1 // Após mais 1 segundo recebemos os próximos 5 resultados // tempo gasto pelos 5 workers para processar as últimas // 5 tasks received result 36 received result 25 received result 49 received result 64 received result 81 // impressão das demais informações do agregador total os squares received: 10 sum of squares: 285
Expandindo a aplicabilidade:
Esse conceito de multiplexação é apenas um dentre vários que podemos utilizar para garantir o trabalho de forma assíncrona para solução de diversos problemas. Sua aplicação vai desde requisições para APIs externas até execução de algoritmos custosos em segmentos menores.
Fontes
MULTIPLEXING CHANNELS IN GO - https://katcipis.github.io/blog/mux-channels-go/
My Go Resolutions for 2017 - https://research.swtch.com/go2017
Multiplexador - https://pt.wikipedia.org/wiki/Multiplexador
Top comments (3)
Ficou muito bom seu artigo Luiz, meus parabéns!
Muito bem explicado! Channels é um conhecimento que leva a um outro nivel quando estamos nos aventurando em Golang.
Muito bom Luizao!! 👏👏👏