-
-
Save schaunwheeler/5ac6fb4cc393f921fc8b8b55bc2ced2e to your computer and use it in GitHub Desktop.
from numpy.random import RandomState | |
import pyspark.sql.functions as f | |
from pyspark import StorageLevel | |
def hashmin_jaccard_spark( | |
sdf, node_col, edge_basis_col, suffixes=('A', 'B'), | |
n_draws=100, storage_level=None, seed=42, verbose=False): | |
""" | |
Calculate a sparse Jaccard similarity matrix using MinHash. | |
Parameters | |
sdf (pyspark.sql.DataFrame): A Dataframe containing at least two columns: | |
one defining the nodes (similarity between which is to be calculated) | |
and one defining the edges (the basis for node comparisons). | |
node_col (str): the name of the DataFrame column containing node labels | |
edge_basis_col: the name of the DataFrame columns containing the edge labels | |
suffixes (tuple): A tuple of length 2 contining the suffixes to be appeneded | |
to `node_col` in the output | |
n_draws (int): the number of permutations to do; this determines the precision | |
of the Jaccard similarity (n_draws == 100, the default, results in | |
similarity precision up to 0.01. | |
storage_level (pyspark.StorageLevel): PySpark object indicating how to persist | |
the hashing stage of the process | |
seed (int): seed for random state generation | |
verbose (bool): if True, print some information about how many records get hashed | |
""" | |
HASH_PRIME = 2038074743 | |
left_name = node_col + suffixes[0] | |
right_name = node_col + suffixes[1] | |
rs = RandomState(seed) | |
shifts = rs.randint(0, HASH_PRIME - 1, size=n_draws) | |
coefs = rs.randint(0, HASH_PRIME - 1, size=n_draws) + 1 | |
hash_sdf = ( | |
sdf | |
.selectExpr( | |
"*", | |
*[ | |
f"((1L + hash({edge_basis_col})) * {a} + {b}) % {HASH_PRIME} as hash{n}" | |
for n, (a, b) in enumerate(zip(coefs, shifts)) | |
] | |
) | |
.groupBy(node_col) | |
.agg( | |
f.array(*[f.min(f"hash{n}") for n in range(n_draws)]).alias("minHash") | |
) | |
.select( | |
node_col, | |
f.posexplode(f.col('minHash')).alias('hashIndex', 'minHash') | |
) | |
.groupby('hashIndex', 'minHash') | |
.agg( | |
f.collect_list(f.col(node_col)).alias('nodeList'), | |
f.collect_set(f.col(node_col)).alias('nodeSet') | |
) | |
) | |
if storage_level is not None: | |
hash_sdf = hash_sdf.persist(storage_level) | |
hash_count = hash_sdf.count() | |
if verbose: | |
print('Hash dataframe count:', hash_count) | |
adj_sdf = ( | |
hash_sdf.alias('a') | |
.join(hash_sdf.alias('b'), ['hashIndex', 'minHash'], 'inner') | |
.select( | |
f.col('minhash'), | |
f.explode(f.col('a.nodeList')).alias(left_name), | |
f.col('b.nodeSet') | |
) | |
.select( | |
f.col('minHash'), | |
f.col(left_name), | |
f.explode(f.col('nodeSet')).alias(right_name), | |
) | |
.groupby(left_name, right_name) | |
.agg((f.count('*') / n_draws).alias('jaccardSimilarity')) | |
) | |
return adj_sdf |
I'd do something along these lines:
import pyspark.sql.functions as f
sentenceDataFrame = spark.createDataFrame([
(0, ["Hi","I","heard","about","Spark"]),
(1, ["Hi","I","hi","about","Spark"]),
(2, ["Logistic","regression","models","are","neat"])
], ["id", "sentence"])
hashmin_jaccard_spark(
sdf=sentenceDataFrame.select('id', f.explode(f.col('sentence').alias('words'),
node_col='id',
edge_basis_col='words'
)
If you just feed it the entire array, or several columns, then it's going to compare the entire set, not elements of the set.
Hi Shaun,
We are trying to use hashmin_jaccard_spark to obtain similarities between words and phrases. However, we always obtain a similarity score of 1. Is there a way to filter out the word from the list of candidate comparison words? Or what are we doing wrong?
Thank you!
I can't give you advice unless I know more about your problem. Can you give me a few example phrase pairs that are coming up as having a similarity of 1.0?
Hi @schaunwheeler,
I am working with @ofurtuna. First of all, thank you for the function: it is really helping us with our task.
Here is a bit of context for our issue: we have a spark DF of words and we would like to compute the jaccard distance of each word with every other: we are trying to identify spelling mistakes. The DF has a column with the words (one word per observation) and another column with the id of the texts these words were extracted from. Each id uniquely identifies a text, but in our DF it is repeated, because there are more than one word for each texts and also one word can be found in more than one text and thus have more than one id.
We have used the function with the column of words both as node_col
and edge_basis_col
. The end result is a DF were every word is paired with itself, with jaccard_similarity
equal to 1. Instead, what we would like is to have a DF with every word paired with all the other and the correspondent jaccard similarity value. Is there a way to adjust the function to do so?
We have also tried to use the id column as node_col
and we obtained what we want in term of structure: every id paired with the others and different values of jaccard similarity; however, we have no way to link back the id to the specific word it is referring to.
I hope this explains our problem to you. If not, we will be glad to give you further clarifications. Any help you can give us would be more than appreciated.
Thank you and have a nice day. :)
You would need to explode each word into individual letters - one letter per record. The words will then be nodes and the letters will be the edge basis column.
For what it's worth, I think you should try a different way to find spelling mistakes. Spelling is about order as well as content, and Jaccard similarity does not consider order. You might consider using a spell-checking library within a UDF. Just a suggestion.
I see, thank you very much for your answer!
As you said, spelling needs order as well, so we might follow your advice. In any case, we will try with the 'explosion' and see what we get.
Thank you again!
If I want to process for multiple columns like
sentenceDataFrame = spark.createDataFrame([
(0, ["Hi","I","heard","about","Spark"],'Hi' ),
(1, ["Hi","I","hi","about","Spark"],'What'),
(2, ["Logistic","regression","models","are","neat"],'are')
], ["id", "sentence","sentence2"]).show()
how should I edit the code? quite confessed as Im new to this
What do you hope to do with the words in sentence2
? If you just want those words considered in the similarity calculation the same as the array of words in sentence
, then just add the word in sentence2
to the array in sentence
, explode the array, and use that exploded column as your edge basis column and id
as your node column. If you want sentence2
to impact the similarity some other way, then that's beyond the scope of this function.
Thanks for the gist
. I was writing some unit tests and noticed that the error bounds are out-of-wack. I think you need to change line 45 from f"((1L + hash({edge_basis_col})) * {a} + {b}) % {HASH_PRIME} as hash{n}"
to something like f"((1L + abs(hash({edge_basis_col}) % {HASH_PRIME})) * {a} + {b}) % {HASH_PRIME} as hash{n}"
?
From the source where you got the hash function permutations, they cite this paper as proof that this family of hash functions work. But a condition for the proof is that, in the linear permutations hash({edge_basis_col})
which can be any integer (even negative), so we need to force it to fall in
I don't know how spark
's hash()
works—so can't really check if this change actually makes the implementation theoretically sound ... but at least it passes my unit tests for theoretical error bounds.
How would you pass that info though? Been trying this way among others....
That will run successfully but it only outputs a Jacard similarity of 1.0 or 0....whether the arrays are exactly the same or not. So it is not splitting that array data up in the "sentence" column and finding partial similarities.
Also tried:
But it always outputs this...not any partial similarities:
I don't see in the code how it will handle the multiple fields in any situation I have shared.