Friday, May 3, 2019

Pyarrow speed up conversions between pandas and dataframes

Long story short:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Speeding up PySpark with Apache Arrow 

Published 26 Jul 2017 
Bryan Cutler is a software engineer at IBM’s Spark Technology Center STC
Beginning with Apache Spark version 2.3, Apache Arrow will be a supported dependency and begin to offer increased performance with columnar data transfer. If you are a Spark user that prefers to work in Python and Pandas, this is a cause to be excited over! The initial work is limited to collecting a Spark DataFrame with toPandas(), which I will discuss below, however there are many additional improvements that are currently underway.

Optimizing Spark Conversion to Pandas

The previous way of converting a Spark DataFrame to Pandas with DataFrame.toPandas() in PySpark was painfully inefficient. Basically, it worked by first collecting all rows to the Spark driver. Next, each row would get serialized into Python’s pickle format and sent to a Python worker process. This child process unpickles each row into a huge list of tuples. Finally, a Pandas DataFrame is created from the list using pandas.DataFrame.from_records().
This all might seem like standard procedure, but suffers from 2 glaring issues: 1) even using CPickle, Python serialization is a slow process and 2) creating a pandas.DataFrame using from_records must slowly iterate over the list of pure Python data and convert each value to Pandas format. See here for a detailed analysis.
Here is where Arrow really shines to help optimize these steps: 1) Once the data is in Arrow memory format, there is no need to serialize/pickle anymore as Arrow data can be sent directly to the Python process, 2) When the Arrow data is received in Python, then pyarrow can utilize zero-copy methods to create a pandas.DataFrame from entire chunks of data at once instead of processing individual scalar values. Additionally, the conversion to Arrow data can be done on the JVM and pushed back for the Spark executors to perform in parallel, drastically reducing the load on the driver.
As of the merging of SPARK-13534, the use of Arrow when calling toPandas() needs to be enabled by setting the SQLConf “spark.sql.execution.arrow.enabled” to “true”. Let’s look at a simple usage example.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.functions import rand
   ...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
   ...: df.printSchema()
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)

In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s

In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms                                 
Wall time: 737 ms

In [5]: pdf.describe()
                 id             x
count  4.194304e+06  4.194304e+06
mean   2.097152e+06  4.998996e-01
std    1.210791e+06  2.887247e-01
min    0.000000e+00  8.291929e-07
25%    1.048576e+06  2.498116e-01
50%    2.097152e+06  4.999210e-01
75%    3.145727e+06  7.498380e-01
max    4.194303e+06  9.999996e-01
This example was run locally on my laptop using Spark defaults so the times shown should not be taken precisely. Even though, it is clear there is a huge performance boost and using Arrow took something that was excruciatingly slow and speeds it up to be barely noticeable.

Notes on Usage

Here are some things to keep in mind before making use of this new feature. At the time of writing this, pyarrow will not be installed automatically with pyspark and needs to be manually installed, see installation instructions. It is planned to add pyarrow as a pyspark dependency so that > pip install pyspark will also install pyarrow.
Currently, the controlling SQLConf is disabled by default. This can be enabled programmatically as in the example above or by adding the line “spark.sql.execution.arrow.enabled=true” to SPARK_HOME/conf/spark-defaults.conf.
Also, not all Spark data types are currently supported and limited to primitive types. Expanded type support is in the works and expected to also be in the Spark 2.3 release.

Thursday, May 2, 2019

Parallel model training in R


cl <- makeCluster(5)


list.model <- foreach(jj=1:nrow(iter.Seq), .packages = c('hmeasure', 'mlr', 'ada', 'pROC', 'ggplot2', 'data.table', "plyr")) %dopar% {

#-- Do stuff here

  list(model.Final, train.set, test.set, train.decile, test.decile, HMeasure(pred.trn$truth, pred.trn$prob.1)$metrics, HMeasure(pred.tst$truth, pred.tst$prob.1)$metrics)

Setup atom for scala projects

Download atom

then in settings -> install

language-scala   : For formating the scala language
terminal-tab : to open a terminal

1. Packages -> Terminal -> Open terminal

2. type spark-shell

3. drag terminal to the right and code to the left

4. Get the template from

Wednesday, May 1, 2019

Connect spark with a jdbc database


/ Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF =
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 =
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 =
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")

  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \

jdbcDF2 = \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

Speed up spark with small data

Spark is faster than Python/R only when it has massive data, then parallelization is powerful. For small data sizes, the parameter partitions should be reduced.

Partitions are the number of chunks the data is broken down too before parallel processing. With small data, it should not have a lot of partitions because the overhead of setting up the shuffles is large

Dont forget to set local[*]

Python will definitely perform better compared to pyspark on smaller data sets. You will see the difference when you are dealing with larger data sets.
By default when you run spark in SQL Context or Hive Context it will use 200 partitions by default. You need to change it to 10 or what ever valueby using sqlContext.sql("set spark.sql.shuffle.partitions=10");. It will be definitely faster than with default.
1) My dataset is about 220,000 records, 24 MB, and that's not a big enough dataset to show the scaling advantages of Spark.
You are right, you will not see much difference at lower volumes. Spark can be slower as well.
2) My spark is running locally and I should run it in something like Amazon EC instead.
For your volume it might not help much.
3) Running locally is okay, but my computing capacity just doesn't cut it. It's a 8 Gig RAM 2015 Macbook.
Again it does not matter for 20MB data set.
4) Spark is slow because I'm running Python. If I'm using Scala it would be much better. (Con argument: I heard lots of people are using PySpark just fine.)
On stand alone there will be difference. Python has more run time overhead than scala, but on larger cluster with distributed capability it need not matter

Loud fan of desktop

