Monday, April 29, 2019

pyspark aggregate multiple columns with multiple functions

Separate list of columns and functions
Let's say you have a list of functions:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val funs: Seq[Column => Column] = Seq(sum _, min _, max _)
and a list of columns
val cols: Seq[Column] = Seq($"y", $"z")
and a dataset
val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")
you can combine both
val exprs = for { c <- cols; f <- funs} yield f(c)
and then
df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
The same thing could be done in PySpark:
from pyspark.sql import functions as F

funs = [F.sum, F.min, F.max]
cols = ["y", "z"]

df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))

df.groupBy("x").agg(*[f(c) for c in cols for f in funs])
Predefined list of operations for each column
If you want start with predefined set of aliases, columns and functions, as the one shown in your question, it might be easier to just restructure it to
trait AggregationOp {
  def expr: Column
}

case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
    ) extends AggregationOp {
  def expr = func(c).alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   FuncAggregationOp($"y", sum _, "alias_column_name1"),
   FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
You can easily adjust this to handle other cases:
case class StringAggregationOp(c: String, func: String, alias: String
    ) extends AggregationOp {
  def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   StringAggregationOp("y", "sum", "alias_column_name1"),
   StringAggregationOp("z", "sum", "alias_column_name2")
)
Python equivalent could be something like this:
from collections import namedtuple
from pyspark.sql import functions as F

class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
    def expr(self):
        if callable(self.func):
            return self.func(self.c).alias(self.alias)
        else:
            return F.expr("{func}(`{c}`)".format
                (func = self.func, c = self.c)).alias(self.alias)

ops = [
    AggregationOp("y", "sum", "alias_column_name1"),
    AggregationOp("z", "sum", "alias_column_name2")
]

 df.groupBy("x").agg(*[op.expr() for op in ops])

Scala:
You can for example map over a list of functions with a defined mapping from name to function:
import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+
or
df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:
df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Python:
from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

No comments:

Post a Comment

Loud fan of desktop

 Upon restart the fan of the desktop got loud again. I cleaned the desktop from the dust but it was still loud (Lower than the first sound) ...