Skip to content

Instantly share code, notes, and snippets.

@msummersgill
Last active March 31, 2021 15:11
Show Gist options
  • Save msummersgill/bf0397b4f6450d316a267cc9b3ebd9b4 to your computer and use it in GitHub Desktop.
Save msummersgill/bf0397b4f6450d316a267cc9b3ebd9b4 to your computer and use it in GitHub Desktop.
## /home/matthew14786/DatabricksTesting/Python-3.7.5/python
## installed (no explicit import) pyarrow version 3.0.0
import numpy as np ## numpy version 1.20.1
import pandas as pd ## pandas version 1.2.3
import pyspark ## pypark version 7.3.9
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType
conf = pyspark.SparkConf()
conf.set("spark.databricks.service.address","https://adb-5093516137124821.1.azuredatabricks.net/")
conf.set("spark.databricks.service.token","<token>")
conf.set("spark.databricks.service.clusterId","1211-161851-undid610")
conf.set("spark.databricks.service.orgId","5093516137124821")
conf.set("spark.databricks.service.port","15001")
conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","false")
conf.set("spark.driver.memory","16g")
conf.set("spark.driver.maxResultSize","16g")
conf.set("spark.ui.enabled","false")
conf.set("spark.sql.inMemoryColumnarStorage.batchSize","1000")
conf.set("spark.sql.execution.arrow.maxRecordsPerBatch","1000")
sc = pyspark.SparkContext(conf=conf)
sc.sparkHome = '/home/matthew14786/.local/lib/python3.7/site-packages/pyspark'
sqlContext = SQLContext(sc)
## Define dataframe row count
L = 365*86400
## Create a spark dataframe with a single column containing sequential integer values from 1:L
ID = sqlContext.range(1,L)
Remote = ID.withColumn("ID", ID["id"].cast(IntegerType()))
## Define an arbitrary number of additional columns to add
## 7 extra columns (for a total of 8) collects successfully
## 8 extra columns (for a total of 9) fails to collect
ExtraCols = 8
## Create additional columns by copying
for i in range(1,ExtraCols + 1):
Remote = Remote.withColumn("N"+str(i), Remote["ID"])
## Print out structure of Remote Dataframe
Remote.printSchema()
## Attempt to collect Spark DataFrame to local pandas data.frame
Local = Remote.toPandas()
## With ExtraCols = 8 and 64g memory limit, collect never completes and a stream of random bytes eventually begin to print to console
## With ExtraCols = 8 and 16g memory limits, error message on collect is as follows:
# 21/03/31 10:02:38 WARN SparkServiceRPCClient: Large server response (1073741824 bytes compressed)
# 21/03/31 10:02:49 WARN SparkServiceRPCClient: Unretryable exception for 54e73773-cdc0-4339-9d42-faf0bab4f942: java.lang.OutOfMemoryError: Java heap space
# Exception in thread "serve-Arrow" java.lang.OutOfMemoryError: Java heap space
# at java.util.Arrays.copyOf(Arrays.java:3332)
# at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
# at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
# at java.lang.StringBuffer.append(StringBuffer.java:270)
# at org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
# at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
# at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
# at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
# at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
# at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
# at org.apache.log4j.Category.callAppenders(Category.java:206)
# at org.apache.log4j.Category.forcedLog(Category.java:391)
# at org.apache.log4j.Category.log(Category.java:856)
# at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
# at org.apache.spark.internal.Logging.logWarning(Logging.scala:69)
# at org.apache.spark.internal.Logging.logWarning$(Logging.scala:68)
# at com.databricks.service.SparkServiceRPCClientStub.logWarning(SparkServiceRPCClientStub.scala:61)
# at com.databricks.service.SparkServiceRPCClient.executeRPC0(SparkServiceRPCClient.scala:92)
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRpcRetries(SparkServiceRemoteFuncRunner.scala:234)
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPC(SparkServiceRemoteFuncRunner.scala:156)
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPCHandleCancels(SparkServiceRemoteFuncRunner.scala:287)
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute0$1(SparkServiceRemoteFuncRunner.scala:118)
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$1112/1284093949.apply(Unknown Source)
# at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRetry(SparkServiceRemoteFuncRunner.scala:135)
# at com.databricks.service.SparkServiceRemoteFuncRunner.execute0(SparkServiceRemoteFuncRunner.scala:113)
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute$1(SparkServiceRemoteFuncRunner.scala:86)
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$2305/1762499648.apply(Unknown Source)
# at com.databricks.spark.util.Log4jUsageLogger.recordOperation(UsageLogger.scala:210)
# at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346)
# at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325)
# at com.databricks.service.SparkServiceRPCClientStub.recordOperation(SparkServiceRPCClientStub.scala:61)
## End Spark Session
sc.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment