Last active
March 31, 2021 15:11
-
-
Save msummersgill/bf0397b4f6450d316a267cc9b3ebd9b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## /home/matthew14786/DatabricksTesting/Python-3.7.5/python | |
## installed (no explicit import) pyarrow version 3.0.0 | |
import numpy as np ## numpy version 1.20.1 | |
import pandas as pd ## pandas version 1.2.3 | |
import pyspark ## pypark version 7.3.9 | |
from pyspark.sql import SQLContext | |
from pyspark.sql.types import IntegerType | |
from pyspark.sql.types import LongType | |
conf = pyspark.SparkConf() | |
conf.set("spark.databricks.service.address","https://adb-5093516137124821.1.azuredatabricks.net/") | |
conf.set("spark.databricks.service.token","<token>") | |
conf.set("spark.databricks.service.clusterId","1211-161851-undid610") | |
conf.set("spark.databricks.service.orgId","5093516137124821") | |
conf.set("spark.databricks.service.port","15001") | |
conf.set("spark.sql.execution.arrow.pyspark.enabled","true") | |
conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","false") | |
conf.set("spark.driver.memory","16g") | |
conf.set("spark.driver.maxResultSize","16g") | |
conf.set("spark.ui.enabled","false") | |
conf.set("spark.sql.inMemoryColumnarStorage.batchSize","1000") | |
conf.set("spark.sql.execution.arrow.maxRecordsPerBatch","1000") | |
sc = pyspark.SparkContext(conf=conf) | |
sc.sparkHome = '/home/matthew14786/.local/lib/python3.7/site-packages/pyspark' | |
sqlContext = SQLContext(sc) | |
## Define dataframe row count | |
L = 365*86400 | |
## Create a spark dataframe with a single column containing sequential integer values from 1:L | |
ID = sqlContext.range(1,L) | |
Remote = ID.withColumn("ID", ID["id"].cast(IntegerType())) | |
## Define an arbitrary number of additional columns to add | |
## 7 extra columns (for a total of 8) collects successfully | |
## 8 extra columns (for a total of 9) fails to collect | |
ExtraCols = 8 | |
## Create additional columns by copying | |
for i in range(1,ExtraCols + 1): | |
Remote = Remote.withColumn("N"+str(i), Remote["ID"]) | |
## Print out structure of Remote Dataframe | |
Remote.printSchema() | |
## Attempt to collect Spark DataFrame to local pandas data.frame | |
Local = Remote.toPandas() | |
## With ExtraCols = 8 and 64g memory limit, collect never completes and a stream of random bytes eventually begin to print to console | |
## With ExtraCols = 8 and 16g memory limits, error message on collect is as follows: | |
# 21/03/31 10:02:38 WARN SparkServiceRPCClient: Large server response (1073741824 bytes compressed) | |
# 21/03/31 10:02:49 WARN SparkServiceRPCClient: Unretryable exception for 54e73773-cdc0-4339-9d42-faf0bab4f942: java.lang.OutOfMemoryError: Java heap space | |
# Exception in thread "serve-Arrow" java.lang.OutOfMemoryError: Java heap space | |
# at java.util.Arrays.copyOf(Arrays.java:3332) | |
# at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) | |
# at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) | |
# at java.lang.StringBuffer.append(StringBuffer.java:270) | |
# at org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419) | |
# at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) | |
# at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) | |
# at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) | |
# at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) | |
# at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) | |
# at org.apache.log4j.Category.callAppenders(Category.java:206) | |
# at org.apache.log4j.Category.forcedLog(Category.java:391) | |
# at org.apache.log4j.Category.log(Category.java:856) | |
# at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401) | |
# at org.apache.spark.internal.Logging.logWarning(Logging.scala:69) | |
# at org.apache.spark.internal.Logging.logWarning$(Logging.scala:68) | |
# at com.databricks.service.SparkServiceRPCClientStub.logWarning(SparkServiceRPCClientStub.scala:61) | |
# at com.databricks.service.SparkServiceRPCClient.executeRPC0(SparkServiceRPCClient.scala:92) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRpcRetries(SparkServiceRemoteFuncRunner.scala:234) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPC(SparkServiceRemoteFuncRunner.scala:156) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPCHandleCancels(SparkServiceRemoteFuncRunner.scala:287) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute0$1(SparkServiceRemoteFuncRunner.scala:118) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$1112/1284093949.apply(Unknown Source) | |
# at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRetry(SparkServiceRemoteFuncRunner.scala:135) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.execute0(SparkServiceRemoteFuncRunner.scala:113) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute$1(SparkServiceRemoteFuncRunner.scala:86) | |
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$2305/1762499648.apply(Unknown Source) | |
# at com.databricks.spark.util.Log4jUsageLogger.recordOperation(UsageLogger.scala:210) | |
# at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346) | |
# at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325) | |
# at com.databricks.service.SparkServiceRPCClientStub.recordOperation(SparkServiceRPCClientStub.scala:61) | |
## End Spark Session | |
sc.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment