Tuesday, April 30, 2019

fast reading of csv data using R



In order to quickly read specific rows from a csv, use RSQLite library in R



library(RSQLite)
library(DBI)
con <- dbConnect(RSQLite::SQLite(), "SQLite", dbname = "sample_db.sqlite")
dbWriteTable(con, name="sample_table", value=paste0(data_output_dir,"dm_full_temporal_tmp.csv"), 
             row.names=FALSE, header=TRUE, sep = "|",overwrite = T)
st <- Sys.time()
# Query your data as you like
yourData <- dbGetQuery(con, "SELECT * FROM sample_table where period in ('201810' , '201802') ")
Sys.time()-st

# 34 sec
dbDisconnect(con)

Monday, April 29, 2019

Spark pivot cast stack dataframe

Pivoting Data in SparkSQL

JANUARY 5TH, 2016

One of the core values at Silicon Valley Data Science (SVDS) is contributing back to the community, and one way we do that is through open source contributions. One of the many new features in Spark 1.6.0 is the ability to pivot data in data frames. This was a feature requested by one of my colleagues that I decided to work on.
Pivot tables are an essential part of data analysis and reporting. A pivot can be thought of as translating rows into columns while applying one or more aggregations. Many popular data manipulation tools (pandas, reshape2, and Excel) and databases (MS SQL and Oracle 11g) include the ability to pivot data.
Below you’ll find some examples of how to use pivot in PySpark, using this dataset: https://vincentarelbundock.github.io/Rdatasets/csv/ggplot2/mpg.csv
$ bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mpg.csv')
>>> df.withColumnRenamed("manufacturer", "manuf").show(5) 
+---+-----+-----+-----+----+---+----------+---+---+---+---+-------+
|   |manuf|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+---+-----+-----+-----+----+---+----------+---+---+---+---+-------+
|  1| audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|  2| audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|  3| audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|  4| audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|  5| audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+---+-----+-----+-----+----+---+----------+---+---+---+---+-------+
If we wanted to see the average highway mpg of cars by class and year, we could do a simple group by and aggregation:
>>> df.groupBy('class', 'year').avg('hwy').show()
+----------+----+------------------+
|     class|year|          avg(hwy)|
+----------+----+------------------+
|    pickup|2008|16.941176470588236|
|subcompact|1999|              29.0|
|subcompact|2008|            27.125|
|   2seater|1999|              24.5|
|   2seater|2008|              25.0|
|   compact|1999|             27.92|
|   compact|2008|28.727272727272727|
|   minivan|1999|              22.5|
|   midsize|1999|              26.5|
|   minivan|2008|              22.2|
|   midsize|2008|28.047619047619047|
|       suv|1999|17.551724137931036|
|       suv|2008|18.636363636363637|
|    pickup|1999|           16.8125|
+----------+----+------------------+
Pivoting on the year column gives the same information, just laid out differently:
>>> df.groupBy('class').pivot('year').avg('hwy').show()
+----------+------------------+------------------+
|     class|              1999|              2008|
+----------+------------------+------------------+
|       suv|17.551724137931036|18.636363636363637|
|   2seater|              24.5|              25.0|
|    pickup|           16.8125|16.941176470588236|
|   midsize|              26.5|28.047619047619047|
|   compact|             27.92|28.727272727272727|
|   minivan|              22.5|              22.2|
|subcompact|              29.0|            27.125|
+----------+------------------+------------------+
We can also specify the values of the pivot column, which is more efficient as otherwise they need to be determined with a query to the data frame. To do so, just provide a list of values as the second argument to pivot.
>>> df.groupBy('class').pivot('year', [1999, 2008]).avg('hwy').show()
Finally, just like a normal group by we can use multiple aggregations:
>>> from pyspark.sql import functions as F
>>> df.groupBy('class').pivot('year', [1999, 2008]).agg(F.min(df.hwy), F.max(df.hwy)).show()
+----------+-------------+-------------+-------------+-------------+
|     class|1999_min(hwy)|1999_max(hwy)|2008_min(hwy)|2008_max(hwy)|
+----------+-------------+-------------+-------------+-------------+
|       suv|           15|           25|           12|           27|
|   2seater|           23|           26|           24|           26|
|    pickup|           15|           20|           12|           22|
|   midsize|           24|           29|           23|           32|
|   compact|           23|           44|           25|           37|
|   minivan|           21|           24|           17|           24|
|subcompact|           21|           44|           20|           36|
+----------+-------------+-------------+-------------+-------------+
Pivot is available in the ScalaJava, and Python API. This has just been a quick overview, but SVDS would love to hear how you’re using pivot, and some of your other favorite Spark features.

pyspark aggregate multiple columns with multiple functions

Separate list of columns and functions
Let's say you have a list of functions:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val funs: Seq[Column => Column] = Seq(sum _, min _, max _)
and a list of columns
val cols: Seq[Column] = Seq($"y", $"z")
and a dataset
val df = Seq((1, 2, 3), (1, 4, 5) ).toDF("x", "y", "z")
you can combine both
val exprs = for { c <- cols; f <- funs} yield f(c)
and then
df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
The same thing could be done in PySpark:
from pyspark.sql import functions as F

funs = [F.sum, F.min, F.max]
cols = ["y", "z"]

df = spark.createDataFrame([(1, 2, 3), (1, 4, 5)], ("x", "y", "z"))

df.groupBy("x").agg(*[f(c) for c in cols for f in funs])
Predefined list of operations for each column
If you want start with predefined set of aliases, columns and functions, as the one shown in your question, it might be easier to just restructure it to
trait AggregationOp {
  def expr: Column
}

case class FuncAggregationOp(c: Column, func: Column => Column, alias: String
    ) extends AggregationOp {
  def expr = func(c).alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   FuncAggregationOp($"y", sum _, "alias_column_name1"),
   FuncAggregationOp($"z", sum _, "alias_column_name2")
)
val exprs = ops.map(_.expr)

df.groupBy($"x").agg(exprs.head, exprs.tail: _*)
You can easily adjust this to handle other cases:
case class StringAggregationOp(c: String, func: String, alias: String
    ) extends AggregationOp {
  def expr = org.apache.spark.sql.functions.expr(s"${func}(`${c}`)").alias(alias)
}

val ops: Seq[AggregationOp] = Seq(
   StringAggregationOp("y", "sum", "alias_column_name1"),
   StringAggregationOp("z", "sum", "alias_column_name2")
)
Python equivalent could be something like this:
from collections import namedtuple
from pyspark.sql import functions as F

class AggregationOp(namedtuple("Op", ["c", "func", "alias"])):
    def expr(self):
        if callable(self.func):
            return self.func(self.c).alias(self.alias)
        else:
            return F.expr("{func}(`{c}`)".format
                (func = self.func, c = self.c)).alias(self.alias)

ops = [
    AggregationOp("y", "sum", "alias_column_name1"),
    AggregationOp("z", "sum", "alias_column_name2")
]

 df.groupBy("x").agg(*[op.expr() for op in ops])

Scala:
You can for example map over a list of functions with a defined mapping from name to function:
import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
  "min" -> min, "max" -> max, "mean" -> avg)

val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// |  k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// |  1|   3.0|   3.0|   3.0|
// |  2|  -5.0|  -5.0|  -5.0|
// +---+------+------+------+
or
df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:
df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
  f => s"$f($c) AS ${c}_${f}")
).mkString(",")

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Python:
from pyspark.sql.functions import mean, sum, max, col

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"] 
funs = [mean, sum, max]

exprs = [f(col(c)) for f in funs for c in aggregate]

# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)

Loud fan of desktop

 Upon restart the fan of the desktop got loud again. I cleaned the desktop from the dust but it was still loud (Lower than the first sound) ...