Se você trabalha com PySpark e já precisou aplicar uma lógica dentro de uma coluna do tipo array, sua primeira reação provavelmente foi: "Vou criar uma UDF". É uma solução rápida e flexível, mas que esconde um grave problema de performance.
Neste post, vamos explorar a forma correta e muito mais eficiente de fazer isso usando Funções de Ordem Superior (Higher-Order Functions - HOFs).
O Custo Invisível das UDFs de Python
Uma User-Defined Function (UDF) em Python parece simples, mas por baixo dos panos, o Spark faz um trabalho caro:
- Serialização: Para cada linha, os dados da coluna saem do ambiente otimizado do Spark (JVM).
- Transferência: Os dados são enviados para um processo Python.
- Execução: Sua função Python é executada.
- Desserialização: O resultado volta da Python para a JVM.
Esse "ida e volta" para cada elemento de dados em um cluster é um gargalo gigante. Em datasets de milhões ou bilhões de linhas, isso pode fazer seu job demorar horas a mais ou até mesmo falhar.
A Solução: Funções de Ordem Superior (HOFs)
HOFs são funções nativas do Spark SQL que recebem outras funções (geralmente lambdas) como argumento para processar dados complexos, como arrays e mapas.
A grande vantagem é que toda a operação acontece dentro da JVM, sem o custo de serialização/desserialização. A lógica que você define na função lambda é executada pelo próprio motor do Spark, aproveitando toda a sua otimização.
Use HOFs quando precisar:
- Transformar cada elemento de um array (
transform
). - Filtrar elementos de um array com base em uma condição (
filter
). - Verificar se um elemento que satisfaz uma condição existe no array (
exists
). - Agregar elementos de um array (
aggregate
).
Mão na Massa: Exemplos Práticos
Vamos criar um DataFrame simples para nossos exemplos. Nossa coluna scores
é um array de inteiros.
from pyspark.sql import SparkSession from pyspark.sql.functions import expr, col # Criando a Spark Session spark = SparkSession.builder.appName("HOF_Examples").getOrCreate() # DataFrame de exemplo data = [ (1, "aluno_a", [80, 92, 75, 88]), (2, "aluno_b", [60, 70, 65, 58]), (3, "aluno_c", [95, 98, 100, 92]), ] columns = ["id", "aluno", "scores"] df_scores = spark.createDataFrame(data, columns)
1. transform
: Aplicando uma transformação a cada elemento
df_bonus = df_scores.withColumn( "scores_com_bonus", expr("transform(scores, x -> x + 10)") ) df_bonus.show(truncate=False)
2. filter
: Filtrando elementos de um array
df_aprovados = df_scores.withColumn( "notas_altas", expr("filter(scores, nota -> nota >= 90)") ) df_aprovados.show(truncate=False)
3. exists
: Verificando a existência de um elemento
df_nota_max = df_scores.withColumn( "tirou_100", expr("exists(scores, nota -> nota = 100)") ) df_nota_max.show()
Guia de Referência Rápida: Funções de Ordem Superior
Aqui está uma lista das HOFs mais comuns para você consultar.
Para Arrays
-
transform(array, function)
- Descrição: Aplica uma função a cada elemento do array e retorna um novo array com os resultados.
- Exemplo de Uso: Converter todos os nomes de um array para maiúsculas.
transform(nomes, nome -> upper(nome))
-
filter(array, function)
- Descrição: Retorna um novo array contendo apenas os elementos que satisfazem uma condição booleana.
- Exemplo de Uso: Manter apenas os números pares de um array.
filter(numeros, n -> n % 2 == 0)
-
exists(array, function)
- Descrição: Retorna
true
se pelo menos um elemento do array satisfaz a condição. - Exemplo de Uso: Verificar se há algum produto com status "URGENTE" em um array de status.
exists(status, s -> s = 'URGENTE')
- Descrição: Retorna
-
forall(array, function)
- Descrição: Retorna
true
se todos os elementos do array satisfazem a condição. - Exemplo de Uso: Checar se todas as tarefas de um projeto estão com status "CONCLUÍDO".
forall(tarefas, t -> t.status = 'CONCLUÍDO')
- Descrição: Retorna
-
aggregate(array, start, merge [, finish])
- Descrição: Reduz os elementos de um array a um único valor, começando com um valor inicial e aplicando uma função de merge.
- Exemplo de Uso: Somar todos os valores de um array de números.
aggregate(valores, 0, (acumulador, valor) -> acumulador + valor)
-
zip_with(array1, array2, function)
- Descrição: Une dois arrays, elemento por elemento, aplicando uma função que combina os pares.
- Exemplo de Uso: Calcular o total de cada item multiplicando um array de
quantidades
por um deprecos
.zip_with(quantidades, precos, (q, p) -> q * p)
Para Mapas
-
transform_keys(map, function)
- Descrição: Aplica uma função a cada chave do mapa e retorna um novo mapa.
- Exemplo de Uso: Padronizar todas as chaves de um mapa para minúsculas.
transform_keys(mapa, (k, v) -> lower(k))
-
transform_values(map, function)
- Descrição: Aplica uma função a cada valor do mapa e retorna um novo mapa.
- Exemplo de Uso: Aplicar um desconto de 10% em todos os preços de um mapa
produto -> preco
.transform_values(mapa, (k, v) -> v * 0.9)
-
map_filter(map, function)
- Descrição: Retorna um novo mapa contendo apenas as entradas que satisfazem uma condição.
- Exemplo de Uso: Filtrar um mapa de
produto -> estoque
para manter apenas produtos com estoque maior que zero.map_filter(mapa, (k, v) -> v > 0)
Experimente Você Mesmo!
Uma ótima maneira de testar tudo o que vimos aqui, sem a dor de cabeça de configurar um ambiente Spark local, é através da Databricks Free Edition, que oferece um cluster gratuito para estudo e desenvolvimento.
Para facilitar, deixei um notebook público com todo o código deste post pronto para ser visualizado e importado para a sua conta:
➡️ Notebook com os Exemplos: Visualizar no Databricks
Conclusão
Da próxima vez que você precisar manipular elementos dentro de um array (ou mapa) no PySpark, respire fundo e lembre-se das Funções de Ordem Superior.
Com os exemplos práticos e o guia de referência acima, você tem tudo o que precisa para começar. Pergunte-se: "Consigo resolver isso com transform
, filter
, exists
ou outra HOF?". A resposta quase sempre será "sim", e seu pipeline de dados vai te agradecer com uma performance muito superior.
Top comments (0)