-
-
Save zaloogarcia/11508e9ca786c6851513d31fb2e70bfc to your computer and use it in GitHub Desktop.
from pyspark.sql.types import * | |
# Auxiliar functions | |
# Pandas Types -> Sparks Types | |
def equivalent_type(f): | |
if f == 'datetime64[ns]': return DateType() | |
elif f == 'int64': return LongType() | |
elif f == 'int32': return IntegerType() | |
elif f == 'float64': return FloatType() | |
else: return StringType() | |
def define_structure(string, format_type): | |
try: typo = equivalent_type(format_type) | |
except: typo = StringType() | |
return StructField(string, typo) | |
#Given pandas dataframe, it will return a spark's dataframe | |
def pandas_to_spark(df_pandas): | |
columns = list(df_pandas.columns) | |
types = list(df_pandas.dtypes) | |
struct_list = [] | |
for column, typo in zip(columns, types): | |
struct_list.append(define_structure(column, typo)) | |
p_schema = StructType(struct_list) | |
return sqlContext.createDataFrame(df_pandas, p_schema) | |
Thank you! For my project, I had to add boolean types to the equivalent_type function.
def equivalent_type(f): ... elif f == "bool": return BooleanType()
Thank you for sharing the code. I am using pyspark 3.1.2 and running your code NameError: name 'sqlContext' is not defined
. I used from pyspark.sql import SQLContext
instead, but it gives the following error:
File "utils/data.py", line 82, in <module>
sparkdf = pandas_to_spark(df)
File "utils/data.py", line 74, in pandas_to_spark
return SQLContext.createDataFrame(pandas_df, p_schema)
File "/home/ubuntu/anaconda3/envs/python3/lib/python3.6/site-packages/pyspark/sql/context.py", line 369, in createDataFrame
return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
File "/home/ubuntu/anaconda3/envs/python3/lib/python3.6/site-packages/pandas/core/generic.py", line 5141, in __getattr__
return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'sparkSession'
Would you mind taking a look? Apologies if the question is too naive.. I have no knowledge in spark/pyspark. Thank you!
from pyspark import SparkContext, SQLContext
sqlContext = SQLContext(SparkContext.getOrCreate())
Try adding this, then it would work...
Thanks for this one, it's been very useful on my end!
I am not that experienced in pyspark, but is there a reason why equivalent_type
returns a new instance of XxxxType()
at each call, rather than using a dictionary like below?
equivalent_type_dict = {
'datetime64[ns]': DateTime(),
'int64': LongType(),
'int32': IntegerType(),
'float64': DoubleType(),
'float32': FloatType(),
}
def define_structure(string, format_type):
try: typo = equivalent_type_dict[format_type]
except: typo = StringType()
return StructField(string, typo)
...
I can suggest updating this:
equivalent_type_dict = {
...
'float64': DoubleType(),
'float32': FloatType(),
...
}
Because, using FloatType
will remove some decimals, like this:
Pandas: 9544.145833333334 | Float64
SparkDF: 9544.1455 | float
SparkDF: 9544.145833333334 | double
I can suggest updating this:
equivalent_type_dict = { ... 'float64': DoubleType(), 'float32': FloatType(), ... }
Because, using
FloatType
will remove some decimals, like this:Pandas: 9544.145833333334 | Float64
SparkDF: 9544.1455 | float
SparkDF: 9544.145833333334 | double
Thanks for the suggestion, I updated this part in my code.
Thank you for this hist! work very well!