DEV Community

Richardson
Richardson

Posted on

Usando Funções de Ordem Superior (Higher-Order Functions - HOFs)

Se você trabalha com PySpark e já precisou aplicar uma lógica dentro de uma coluna do tipo array, sua primeira reação provavelmente foi: "Vou criar uma UDF". É uma solução rápida e flexível, mas que esconde um grave problema de performance.

Neste post, vamos explorar a forma correta e muito mais eficiente de fazer isso usando Funções de Ordem Superior (Higher-Order Functions - HOFs).

O Custo Invisível das UDFs de Python

Uma User-Defined Function (UDF) em Python parece simples, mas por baixo dos panos, o Spark faz um trabalho caro:

  1. Serialização: Para cada linha, os dados da coluna saem do ambiente otimizado do Spark (JVM).
  2. Transferência: Os dados são enviados para um processo Python.
  3. Execução: Sua função Python é executada.
  4. Desserialização: O resultado volta da Python para a JVM.

Esse "ida e volta" para cada elemento de dados em um cluster é um gargalo gigante. Em datasets de milhões ou bilhões de linhas, isso pode fazer seu job demorar horas a mais ou até mesmo falhar.

A Solução: Funções de Ordem Superior (HOFs)

HOFs são funções nativas do Spark SQL que recebem outras funções (geralmente lambdas) como argumento para processar dados complexos, como arrays e mapas.

A grande vantagem é que toda a operação acontece dentro da JVM, sem o custo de serialização/desserialização. A lógica que você define na função lambda é executada pelo próprio motor do Spark, aproveitando toda a sua otimização.

Use HOFs quando precisar:

  • Transformar cada elemento de um array (transform).
  • Filtrar elementos de um array com base em uma condição (filter).
  • Verificar se um elemento que satisfaz uma condição existe no array (exists).
  • Agregar elementos de um array (aggregate).

Mão na Massa: Exemplos Práticos

Vamos criar um DataFrame simples para nossos exemplos. Nossa coluna scores é um array de inteiros.

from pyspark.sql import SparkSession from pyspark.sql.functions import expr, col # Criando a Spark Session spark = SparkSession.builder.appName("HOF_Examples").getOrCreate() # DataFrame de exemplo data = [ (1, "aluno_a", [80, 92, 75, 88]), (2, "aluno_b", [60, 70, 65, 58]), (3, "aluno_c", [95, 98, 100, 92]), ] columns = ["id", "aluno", "scores"] df_scores = spark.createDataFrame(data, columns) 
Enter fullscreen mode Exit fullscreen mode

1. transform: Aplicando uma transformação a cada elemento

df_bonus = df_scores.withColumn( "scores_com_bonus", expr("transform(scores, x -> x + 10)") ) df_bonus.show(truncate=False) 
Enter fullscreen mode Exit fullscreen mode

2. filter: Filtrando elementos de um array

df_aprovados = df_scores.withColumn( "notas_altas", expr("filter(scores, nota -> nota >= 90)") ) df_aprovados.show(truncate=False) 
Enter fullscreen mode Exit fullscreen mode

3. exists: Verificando a existência de um elemento

df_nota_max = df_scores.withColumn( "tirou_100", expr("exists(scores, nota -> nota = 100)") ) df_nota_max.show() 
Enter fullscreen mode Exit fullscreen mode

Guia de Referência Rápida: Funções de Ordem Superior

Aqui está uma lista das HOFs mais comuns para você consultar.

Para Arrays

  • transform(array, function)

    • Descrição: Aplica uma função a cada elemento do array e retorna um novo array com os resultados.
    • Exemplo de Uso: Converter todos os nomes de um array para maiúsculas. transform(nomes, nome -> upper(nome))
  • filter(array, function)

    • Descrição: Retorna um novo array contendo apenas os elementos que satisfazem uma condição booleana.
    • Exemplo de Uso: Manter apenas os números pares de um array. filter(numeros, n -> n % 2 == 0)
  • exists(array, function)

    • Descrição: Retorna true se pelo menos um elemento do array satisfaz a condição.
    • Exemplo de Uso: Verificar se há algum produto com status "URGENTE" em um array de status. exists(status, s -> s = 'URGENTE')
  • forall(array, function)

    • Descrição: Retorna true se todos os elementos do array satisfazem a condição.
    • Exemplo de Uso: Checar se todas as tarefas de um projeto estão com status "CONCLUÍDO". forall(tarefas, t -> t.status = 'CONCLUÍDO')
  • aggregate(array, start, merge [, finish])

    • Descrição: Reduz os elementos de um array a um único valor, começando com um valor inicial e aplicando uma função de merge.
    • Exemplo de Uso: Somar todos os valores de um array de números. aggregate(valores, 0, (acumulador, valor) -> acumulador + valor)
  • zip_with(array1, array2, function)

    • Descrição: Une dois arrays, elemento por elemento, aplicando uma função que combina os pares.
    • Exemplo de Uso: Calcular o total de cada item multiplicando um array de quantidades por um de precos. zip_with(quantidades, precos, (q, p) -> q * p)

Para Mapas

  • transform_keys(map, function)

    • Descrição: Aplica uma função a cada chave do mapa e retorna um novo mapa.
    • Exemplo de Uso: Padronizar todas as chaves de um mapa para minúsculas. transform_keys(mapa, (k, v) -> lower(k))
  • transform_values(map, function)

    • Descrição: Aplica uma função a cada valor do mapa e retorna um novo mapa.
    • Exemplo de Uso: Aplicar um desconto de 10% em todos os preços de um mapa produto -> preco. transform_values(mapa, (k, v) -> v * 0.9)
  • map_filter(map, function)

    • Descrição: Retorna um novo mapa contendo apenas as entradas que satisfazem uma condição.
    • Exemplo de Uso: Filtrar um mapa de produto -> estoque para manter apenas produtos com estoque maior que zero. map_filter(mapa, (k, v) -> v > 0)

Experimente Você Mesmo!

Uma ótima maneira de testar tudo o que vimos aqui, sem a dor de cabeça de configurar um ambiente Spark local, é através da Databricks Free Edition, que oferece um cluster gratuito para estudo e desenvolvimento.

Para facilitar, deixei um notebook público com todo o código deste post pronto para ser visualizado e importado para a sua conta:

➡️ Notebook com os Exemplos: Visualizar no Databricks

Conclusão

Da próxima vez que você precisar manipular elementos dentro de um array (ou mapa) no PySpark, respire fundo e lembre-se das Funções de Ordem Superior.

Com os exemplos práticos e o guia de referência acima, você tem tudo o que precisa para começar. Pergunte-se: "Consigo resolver isso com transform, filter, exists ou outra HOF?". A resposta quase sempre será "sim", e seu pipeline de dados vai te agradecer com uma performance muito superior.

Top comments (0)