Last active
April 7, 2021 21:59
-
-
Save msummersgill/fb61204b73c2bebcaf5a1fe299172b45 to your computer and use it in GitHub Desktop.
SparkR Error Collecting Large SparkDataFrame
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(arrow) | |
## Open source Apache Spark downloaded from this archive: | |
## https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz | |
library(SparkR, lib.loc = "~/DatabricksTesting/spark-3.0.1-bin-hadoop2.7/R/lib/") | |
## $java -version | |
## openjdk version "1.8.0_212" | |
## OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.16.04.1-b03) | |
## OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode) | |
## Utilizing databricks-connect 7.3.9 as pyspark backend, with Python 3.7.5 | |
## Connecting to a databricks cluster running databricks runtime version 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12) | |
## Installation/configuration steps followed: https://docs.databricks.com/dev-tools/databricks-connect.html | |
Sys.setenv(SPARK_HOME='/home/matthew14786/.local/lib/python3.7/site-packages/pyspark') | |
sc <- sparkR.session(sparkConfig = list(spark.databricks.service.address = 'https://adb-5093516137124821.1.azuredatabricks.net/', | |
spark.databricks.service.token = "<Token>", | |
spark.databricks.service.clusterId ='1211-161851-undid610', | |
spark.databricks.service.orgId ='5093516137124821', | |
spark.databricks.service.port ='15001', | |
spark.sql.execution.arrow.sparkr.enabled = "true", | |
spark.driver.memory = "16g", | |
spark.driver.maxResultSize = "16g", | |
spark.ui.enabled = "false", | |
spark.sql.inMemoryColumnarStorage.batchSize = as.integer(1e5), | |
spark.sql.execution.arrow.maxRecordsPerBatch = as.integer(1e5), | |
spark.submit.deployMode = "client"), | |
enableHiveSupport = FALSE) | |
## Print out Spark Session Configuration | |
SparkR::sparkR.conf() | |
# spark.app.id "local-1614716631621" | |
# spark.app.name "SparkR" | |
# spark.driver.host "hprstudio01.hpinc.com" | |
# spark.driver.maxResultSize "16g" | |
# spark.driver.memory "16g" | |
# spark.driver.port "42842" | |
# spark.executor.id "driver" | |
# spark.executorEnv.LD_LIBRARY_PATH "$LD_LIBRARY_PATH:/opt/R/3.6.0/lib/R/lib::/lib:/usr/local/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server" | |
# spark.home "/home/matthew14786/.local/lib/python3.7/site-packages/pyspark" | |
# spark.master "local[*]" | |
# spark.r.sql.derby.temp.dir "/tmp/RtmpsU70s6" | |
# spark.sql.execution.arrow.maxRecordsPerBatch "100000" | |
# spark.sql.execution.arrow.sparkr.enabled "true" | |
# spark.sql.inMemoryColumnarStorage.batchSize "100000" | |
# spark.submit.deployMode "client" | |
# spark.submit.pyFiles "" | |
# spark.ui.enabled "false" | |
# spark.ui.showConsoleProgress "true" | |
## Define dataframe row count | |
L <- 365*86400 | |
## Create a local dataframe with a single column containing sequential integer values from 1:L | |
Original <- data.frame(Index = seq_len(L)) | |
## Copy the data frame to Spark cluster | |
Remote <- SparkR::as.DataFrame(Original) | |
## Define an arbitrary number of additional columns to add | |
## In my environment, 7 extra (total of 8) collects successfully | |
## but 8 extra (for a total of 9) throws the error message included below | |
ExtraCols <- 8 | |
## Create additional columns by copying | |
for(i in seq_len(ExtraCols)){ | |
Remote[[paste0("N",i)]] <- Remote[["Index"]] | |
} | |
## Print out structure of Remote Dataframe | |
str(Remote) | |
# 'SparkDataFrame': 9 variables: | |
# $ Index: int 1 2 3 4 5 6 | |
# $ N1 : int 1 2 3 4 5 6 | |
# $ N2 : int 1 2 3 4 5 6 | |
# $ N3 : int 1 2 3 4 5 6 | |
# $ N4 : int 1 2 3 4 5 6 | |
# $ N5 : int 1 2 3 4 5 6 | |
# $ N6 : int 1 2 3 4 5 6 | |
# $ N7 : int 1 2 3 4 5 6 | |
# $ N8 : int 1 2 3 4 5 6 | |
## Attempt to collect Spark DataFrame to local R data.frame | |
Local <- collect(Remote) | |
# 21/03/02 14:28:31 WARN SparkServiceRPCClient: Large server response (1073741824 bytes compressed) | |
# 21/03/02 14:28:42 WARN SparkServiceRPCClient: Unretryable exception for b5dc2967-84a0-4a1b-9c19-777cf2ef1b4b: java.lang.OutOfMemoryError: Java heap space | |
# Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : | |
# invalid 'n' argument | |
# Exception in thread "serve-Arrow" java.lang.OutOfMemoryError: Java heap space | |
# at java.util.Arrays.copyOf(Arrays.java:3332) | |
# at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) | |
# at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) | |
# at java.lang.StringBuffer.append(StringBuffer.java:270) | |
# at org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419) | |
# at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) | |
# at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) | |
# at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) | |
# at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) | |
# at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) | |
# at org.apache.log4j.Category.callAppenders(Category.java:206) | |
# at org.apache.log4j.Category.forcedLog(Category.java:391) | |
# at org.apache.log4j.Category.log(Category.java:856) | |
# at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401) | |
# at org.apache.spark.internal.Logging.logWarning(Logging.scala:69) | |
# at org.apache.spark.internal.Logging.logWarning$(Logging.scala:68) | |
# at com.databricks.service.SparkServiceRPCClientStub.logWarning(SparkServiceRPCClientStub.scala:61) | |
# at com.databricks.service.SparkServiceRPCClient.executeRPC0(SparkServiceRPCClient.scala:92) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRpcRetries(SparkServiceRemoteFuncRunner.scala:234) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPC(SparkServiceRemoteFuncRunner.scala:156) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPCHandleCancels(SparkServiceRemoteFuncRunner.scala:287) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute0$1(SparkServiceRemoteFuncRunner.scala:118) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$894/3541916.apply(Unknown Source) | |
# at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRetry(SparkServiceRemoteFuncRunner.scala:135) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.execute0(SparkServiceRemoteFuncRunner.scala:113) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute$1(SparkServiceRemoteFuncRunner.scala:86) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$2510/2059484210.apply(Unknown Source) | |
# at com.databricks.spark.util.Log4jUsageLogger.recordOperation(UsageLogger.scala:210) | |
# at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346) | |
# at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325) | |
# at com.databricks.service.SparkServiceRPCClientStub.recordOperation(SparkServiceRPCClientStub.scala:61) | |
SparkR::sparkR.session.stop() | |
## R Environment | |
sessionInfo() | |
# R version 3.6.0 (2019-04-26) | |
# Platform: x86_64-pc-linux-gnu (64-bit) | |
# Running under: Ubuntu 16.04.6 LTS | |
# | |
# Matrix products: default | |
# BLAS: /usr/lib/libblas/libblas.so.3.6.0 | |
# LAPACK: /usr/lib/lapack/liblapack.so.3.6.0 | |
# | |
# locale: | |
# [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8 LC_MONETARY=en_US.UTF-8 | |
# [6] LC_MESSAGES=en_US.UTF-8 LC_PAPER=en_US.UTF-8 LC_NAME=C LC_ADDRESS=C LC_TELEPHONE=C | |
# [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C | |
# | |
# attached base packages: | |
# [1] stats graphics grDevices utils datasets methods base | |
# | |
# other attached packages: | |
# [1] SparkR_3.0.1 arrow_3.0.0 | |
# | |
# loaded via a namespace (and not attached): | |
# [1] tidyselect_1.1.0 bit_4.0.4 compiler_3.6.0 magrittr_1.5 assertthat_0.2.1 R6_2.4.1 tools_3.6.0 glue_1.4.2 | |
# [9] bit64_4.0.5 vctrs_0.3.1 packrat_0.5.0 rlang_0.4.7 purrr_0.3.4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment