- Ansible
- Athena
- Docker
- EC2
- jq
- MongoDB
- Python (General)
- Pandas
- Pytest tricks
- Plotly
- Poetry
- R
- S3 CLI
- Spark
- Terminal hacks
- VScode
def create_or_update_glue_crawler_tags(connection, module, glue_crawler, changed):
account_id = module.client('sts').get_caller_identity().get('Account')
region = connection.meta.region_name
glue_crawler_arn = (
"arn:aws:glue:{region}:{account_id}:crawler/{glue_crawler}"
.format(region=region, account_id=account_id, glue_crawler=glue_crawler["Name"])
)
current_tags = connection.get_tags(ResourceArn=glue_crawler_arn)["Tags"]
if module.params.get("tags") is None and current_tags == dict():
# No current tags and no tags provided in task
return changed
user_tags = module.params.get("tags")
if user_tags == current_tags:
# Current tags are the same as those provided in the task
return changed
# Current tags are different from those provided
# First, remove all existing tags
try:
connection.untag_resource(ResourceArn=glue_crawler_arn, TagsToRemove=list(current_tags.keys()))
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, "Unable to remove existing tags")
# Now set the new tags
if user_tags is None:
return True
try:
connection.tag_resource(
ResourceArn=glue_crawler_arn,
TagsToAdd=user_tags
)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, "Unable to tag crawler")
return True
where:
module = AnsibleAWSModule(...)
connection = module.client('glue')
glue_crawler
is the crawler name
- You data is stored on S3 with the following structure:
s3://my_bucket/base/path/dt=YYYY-MM-DD/
- Each partition contains JSON files
IMPORTANT: If you're trying to change something in the schema, don't forget to DROP
the table first!
CREATE EXTERNAL TABLE IF NOT EXISTS `db_name.table_name` (
`field1` string,
`field2` string,
`field3` float,
`field4` int,
`field5` int,
`field6` string,
`field7` string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://my_bucket/base/path/'
TBLPROPERTIES ('has_encrypted_data'='true');
MSCK REPAIR TABLE `db_name.table_name`
SELECT *
FROM db_name.table_name
WHERE dt > '2020-03-03'
LIMIT 5;
FROM python:3.7.3-alpine3.9
# Install dependencies needed for building of pandas+numpy
# See: https://stackoverflow.com/a/38571314/671013
RUN apk --update add --no-cache \
lapack-dev \
gcc \
freetype-dev
RUN apk add --no-cache --virtual .build-deps \
gfortran \
musl-dev \
g++
RUN ln -s /usr/include/locale.h /usr/include/xlocale.h
# Install python dependencies. Needed to run
RUN pip3 install --no-cache-dir Cython
RUN pip3 install --no-cache-dir numpy
RUN pip3 install --no-cache-dir pandas
SSD provided for a spawned EC2 instance are not mounted automatically. Checkout this answer.
touch res.csv && echo "col1,col2,col3" > res.csv && cat input.json | jq -r '[.col1, .col2, .col3] | @csv' >> res.csv
This is correct, assuming that input.json
has the form:
{"col1": 10, "col2": "foo", "col3": true}
{"col1": 11, "col2": "boo", "col3": false}
{"col1": 11, "col2": "goo", "col3": true}
cat data.json | jq -c '. | {col1: .col1 | .[0:10], col2: .col2, nested1: .nest.foo, nested2: .nest.bar}' > flatten.json
In the example above, the slice [0:10]
is kept from col1
.
db.getCollection('collectionName').find(
{
userId: {$eq: "someID_123"},
contentType: {$eq: "THE_CONTENT_TYPE"},
"content.some.nestedField": {$exists: 1},
createdDate: {$gt: ISODate("2019-05-08")}
}
)
db.getCollection('myCollection').aggregate([
{
"$project": {
"dow": {"$dayOfWeek": "$publishDate"}
}
},
{
"$group": {
"_id": "$dow" ,
"count": { "$sum": 1 }
}
},
{
"$project": {
"dowName": {
"$cond": [ {"$eq":[1,"$_id"]}, "Sun", {
"$cond": [ {"$eq":[2,"$_id"]}, "Mon", {
"$cond": [ {"$eq":[3,"$_id"]}, "Tue", {
"$cond": [ {"$eq":[4,"$_id"]}, "Wed", {
"$cond": [ {"$eq":[5,"$_id"]}, "Thu", {
"$cond": [ {"$eq":[6,"$_id"]}, "Fri", "Sat"]
}]
}]
}]
}]
}]
},
"count": "$count"
}
},
{ "$sort" : { "_id" : 1 } }
])
import datetime
import jsonlines
import pymongo
def insert_data_to_db(filename: str):
"""Insert data (one JSON per line) to DB from a file
Each data point is enriched with the timestamp of insertion.
DB's location is provied by an environment variable
Parameters
----------
filename : str
Path to a json-lines files where each line corresponds to a valid
JSON of the data
"""
LOGGER.info(f"About to upload the data in {filename} to DB")
app_db_uri = os.getenv("APP_DB_URI", "mongodb://localhost:27017/")
client = pymongo.MongoClient(app_db_uri)
db = client["myDB"]
collection = db["myCollection"]
with jsonlines.open(filename) as reader:
for obj in reader:
obj["inserted_ts"] = datetime.datetime.utcnow()
collection.insert(obj)
client.close()
Create the following utils:
class SlackIntegration(BaseSettings):
model_config = SettingsConfigDict(env_prefix="DEALFLOW_SLACK_")
web_hook_url: HttpUrl
def send_message(
message: str, slack_integration: SlackIntegration | None = None
) -> int:
if slack_integration is None:
slack_integration = SlackIntegration()
# Define the payload
payload = {"text": message}
# Define the URL
url = slack_integration.web_hook_url
# Define headers
headers = {"Content-type": "application/json"}
# Make the POST request
response = requests.post(url, data=json.dumps(payload), headers=headers)
return response.status_code
Next, in your program/module/etc. you can use something like:
import sys
from shared.infra.slack import send_message, SlackIntegration
slackintegration = SlackIntegration()
def my_except_hook(exctype, value, traceback):
send_message(f"Error: {value}", slack_integration=slackintegration)
sys.__excepthook__(exctype, value, traceback)
sys.excepthook = my_except_hook
1 / 0
Let us start from this dummy frame:
df = pd.DataFrame(
[
["a", "2020-12-20", 10],
["a", "2020-12-26", 11],
["a", "2020-12-22", 10],
["b", "2020-12-25", 111],
["c", "2020-12-20", 20],
["d", "2020-12-05", 1111]
],
columns=["cat", "date", "value"]
)
df["date"] = pd.to_datetime(df.date)
The objective is to get the weekly sum of values per category
We start with:
res0 = (
df
.set_index("date")
.groupby("cat")
.resample("W")["value"].sum()
)
which yields:
cat date
a 2020-12-20 10
2020-12-27 21
b 2020-12-27 111
c 2020-12-20 20
d 2020-12-06 1111
But this is not good enough.
For the b
category we only have one value and we cannot compare it to the weeks where a
has values.
Let's fill the missing weeks:
res1 = (
res0
.unstack()
.fillna(0)
.stack()
.reset_index()
)
which yields:
cat | date | 0 |
---|---|---|
a | 2020-12-06 00:00:00 | 0 |
a | 2020-12-20 00:00:00 | 10 |
a | 2020-12-27 00:00:00 | 21 |
b | 2020-12-06 00:00:00 | 0 |
b | 2020-12-20 00:00:00 | 0 |
b | 2020-12-27 00:00:00 | 111 |
c | 2020-12-06 00:00:00 | 0 |
c | 2020-12-20 00:00:00 | 20 |
c | 2020-12-27 00:00:00 | 0 |
d | 2020-12-06 00:00:00 | 1111 |
d | 2020-12-20 00:00:00 | 0 |
d | 2020-12-27 00:00:00 | 0 |
Now we have aggregations for all the weeks where at least one category had some value.
But what about the week of 2020-12-13
?
No category was active, but it's still interesting...
To nail that we'd need an intermidate step:
res2 = (
df
.set_index("date")
.groupby("cat")
.resample("W")["value"].sum()
.unstack()
.transpose()
)
yielding:
date | a | b | c | d |
---|---|---|---|---|
2020-12-06 00:00:00 | nan | nan | nan | 1111 |
2020-12-20 00:00:00 | 10 | nan | 20 | nan |
2020-12-27 00:00:00 | 21 | 111 | nan | nan |
Next, we re-index the frame and include the missing week:
res3 = (
res2
.reindex(
pd.date_range(
res2.index.min(),
res2.index.max(),
freq="W", # This is important!!!
name="date"
))
.transpose()
.fillna(0)
.stack()
.reset_index()
.sort_values("date")
)
yielding:
cat | date | 0 |
---|---|---|
a | 2020-12-06 00:00:00 | 0 |
b | 2020-12-06 00:00:00 | 0 |
c | 2020-12-06 00:00:00 | 0 |
d | 2020-12-06 00:00:00 | 1111 |
a | 2020-12-13 00:00:00 | 0 |
b | 2020-12-13 00:00:00 | 0 |
c | 2020-12-13 00:00:00 | 0 |
d | 2020-12-13 00:00:00 | 0 |
a | 2020-12-20 00:00:00 | 10 |
b | 2020-12-20 00:00:00 | 0 |
c | 2020-12-20 00:00:00 | 20 |
d | 2020-12-20 00:00:00 | 0 |
a | 2020-12-27 00:00:00 | 21 |
b | 2020-12-27 00:00:00 | 111 |
c | 2020-12-27 00:00:00 | 0 |
d | 2020-12-27 00:00:00 | 0 |
Note that for 2020-12-13
all values are zero!
Generate dummy data (taken from here)
Hidden way down in Pandas’ testing module are a number of convenient functions for quickly building quasi-realistic Series and DataFrames:
import pandas.util.testing as tm
tm.N, tm.K = 15, 3 # Module-level default rows/columns
import numpy as np
np.random.seed(444)
tm.makeTimeDataFrame(freq='M').head()
# A B C
# 2000-01-31 0.3574 -0.8804 0.2669
# 2000-02-29 0.3775 0.1526 -0.4803
# 2000-03-31 1.3823 0.2503 0.3008
# 2000-04-30 1.1755 0.0785 -0.1791
# 2000-05-31 -0.9393 -0.9039 1.1837
tm.makeDataFrame().head()
# A B C
# nTLGGTiRHF -0.6228 0.6459 0.1251
# WPBRn9jtsR -0.3187 -0.8091 1.1501
# 7B3wWfvuDA -1.9872 -1.0795 0.2987
# yJ0BTjehH1 0.8802 0.7403 -1.2154
# 0luaYUYvy1 -0.9320 1.2912 -0.2907
Assume you have a series:
s = pd.Series(["foo", "bar", 1, 2])
and you want to map foo
to 123
and 1
to yoyo
.
First, define the map as a dictionary:
map_dict = {
"foo": 123,
1: "yoyo"
}
Now you can try:
s.map(map_dict)
This yields:
0 123
1 NaN
2 yoyo
3 NaN
dtype: object
and introduces NaN
's where-ever the value is not a key of the mapping.
If you want to keep the values in s
that are missing from the map, define the following class:
class DictPreserveMissingKeys(dict):
def __init__(self, *arg, **kw):
super(DictPreserveMissingKeys, self).__init__(*arg, **kw)
def __missing__(self, key):
return key
and run:
s.map(DictPreserveMissingKeys(map_dict))
which yields:
0 123
1 bar
2 yoyo
3 2
dtype: object
Here is a minimal example:
import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3
@mock_s3
def test_single_file():
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket="my_bucket")
s3 = S3FileSystem()
my_dict = {"foo": "bar"}
with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
json.dump(my_dict, f)
with tempfile.TemporaryDirectory() as tmp_dir:
copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
with open(f"{tmp_dir}/foo.json", "r") as f:
assert json.load(f) == my_dict
Alternatively, and especially useful if use need to use the same bucket in many tests, you can use a fixture:
import pytest
import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3
@pytest.yield_fixture(scope="function")
def s3():
mocks3 = mock_s3()
mocks3.start()
client = boto3.client("s3")
client.create_bucket(Bucket="my_bucket")
s3 = S3FileSystem()
yield s3
mocks3.stop()
def test_single_file(s3):
my_dict = {"foo": "bar"}
with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
json.dump(my_dict, f)
with tempfile.TemporaryDirectory() as tmp_dir:
copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
with open(f"{tmp_dir}/foo.json", "r") as f:
assert json.load(f) == my_dict
See my answer here.
import pandas as pd
import numpy as np
pd.options.plotting.backend = "plotly"
df = pd.DataFrame(
{
"val": np.sin(np.linspace(0, 7, 100))
}
)
df["signal"] = df.val > 0.7
fig = df.plot(y="val")
for idx in df[df.signal].index:
fig.add_vline(idx)
import pandas as pd
pd.options.plotting.backend = "plotly"
import plotly.graph_objects as go
go.Figure({
'data': my_df.plot().data,
'layout': {
'hovermode': 'x',
'xaxis': {'showspikes': True}
}
})
story_ids = df.story_id.unique()
fig = make_subplots(
rows=story_ids.shape[0], cols=1,
subplot_titles=[f"Story ID: {story_id}" for story_id in story_ids]
)
for i, story_id in enumerate(story_ids):
fig.add_trace(
px.bar(
df, x="foo", y="bar", title=f"Title ID: {story_id}"
).data[0],
row=i+1, col=1
)
fig.update_layout(height=2000, width=1000, title_text="Chapter Retention Rate per Story")
fig
- Create a virtual environment for your installation:
python -m venv ~/.poetry
. In this case, it will be created in the home directory. - Next, update
pip
andsetuptools
as follows:C:\Users\<your-username>\.poetry\Scripts\python.exe -m pip install -U pip setuptools
- Now, you can install
poetry
:C:\Users\<your-username>\.poetry\Scripts\pip.exe install poetry
. - Lastly, it's time to add
poetry
to your path; add at the end of yourPATH
list the following:C:\Users\<your-username>\.poetry\Scripts
. - Test the installation, by restarting your console and running:
poetry config -vvv --list
. This will also hint to you wherepoetry
is looking for its config file(s).
NOTE: Due to slow network connection, I had to manually tweek the time out constant.
This can be done by editing C:\Users\<your-username>\.poetry\.\Lib\site-packages\poetry\utils\constants.py
.
I increased it to 1500
.
I needed that when, for example installing mypy
.
Check out this discord message.
Thanks Clinton!
By first checking that all the NA
s are aligned, you can then switch to the other values and make sure they are all equal.
elementwise_equal <- function(a, b) {
expect_equal(length(a), length(b))
# All NAs appear in the same places
expect_equal(length(which((is.na(a) == is.na(b)) == FALSE)), 0)
expect_equal(
length(which((a == b) == T)) + length(which(is.na(a == b))),
length(a)
)
}
See: jeroen/jsonlite#59 (comment)
jsonlite::stream_in(file("path/to/my/file.json"))
read_gzip_json_from_s3_to_df <- function(path) {
#' Read a single gzipeed JSON file from S3 location into a dataframe
#'
#' The compressed JSON should contain a single object per line
#' with no commas of array structure wrapping the objects
#'
#' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json.gz
raw_data <- path %>% get_object %>% rawConnection %>% gzcon %>% jsonlite::stream_in() %>% jsonlite::flatten()
raw_data
}
read_json_from_s3_to_df <- function(path) {
#' Read a single JSON file from S3 location into a dataframe
#'
#' The JSON should contain a single object per line
#' with no commas of array structure wrapping the objects
#'
#' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json
raw_data <- path %>% get_object %>% rawToChar %>% textConnection %>% jsonlite::stream_in() %>% jsonlite::flatten()
raw_data
}
zoneId_to_offset_in_seconds <- function(zoneID) {
#' Given a ZoneID (see the Java definition), return an offset as a signed integer
#'
#' @param zoneID a string representing a zone ID. Should have the form "+HH:MM or "-HH:MM".
#' May be a single string or a list(?) of strings
tz_sign <- zoneID %>% substr(1,1) %>% paste0(1) %>% as.numeric()
hours <- zoneID %>% substr(2,3) %>% as.numeric()
minutes <- zoneID %>% substr(5,6) %>% as.numeric()
tz_sign * (hours * 3600 + minutes * 60)
}
aws s3 cp s3://my_bucket/the/prefix/with/trailing/ . --recursive --exclude "*" --include "dt=2020-01-*"
ds.select("col_name").collectAsList();
df.select("col_name").collectAsList();
df.select("col_name").distinct().collect()
df = sqlContext.read.json('s3://my_bucket/raw/items/dt=2019-04-*')
# Assuming df is some dataframe/dataset
df.createOrReplaceTempView("data")
spark.sql("SELECT * FROM data WHERE userId = 'foobar'")
See SO
from pyspark.sql.functions import col, countDistinct
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))
The following links helped when figuring out this solution:
- https://stackoverflow.com/a/55590767/671013 (PySpark)
- https://stackoverflow.com/a/47139522/671013 (Scala)
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
Caveat: The mentioned solution requires the specification of the bucket and it is not a generic solution for all S3 paths.
Consider the following dataframe:
df = SQLContext(context).createDataFrame(
[
('1997-02-28 10:30:45.123',),
('1997-02-28 10:30:45',),
("1997-02-28 10:30",),
("1997-02-28 10",),
("1997-02-28",),
], ['t'])
df.show()
Now, compare the results of the following:
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss.SSS').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd').alias('date')).show()
+----------+
| date|
+----------+
|1997-02-28|
| null|
| null|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
| null|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
+----------+
I created this small notebook here with a demo how PySpark behaves with timezones. This blog post is also interesting.
- See: https://gist.github.com/drorata/23feb478e5723ca7074e79dec630c171
- and: https://gist.github.com/drorata/6f96fccd7735daa4179fa5099c739aaa
Assuming your environment has pyspark
installed and you know where your java
is, the following fixture will provide a sqlContext
which you could use in other tests.
@pytest.fixture
def sqlC():
# For this to work I had to tweak:
# $ export JAVA_HOME="$(/usr/libexec/java_home --version 1.8)"
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
return SQLContext(sc)
For example:
def test_foo(sqlC):
assert True
If you have to restore a bunch of dot files (e.g., .gitignore
), then git restore *
won't work for you.
You can use the following:
git status -s | \ # Get the list of files
sed 's|^ M \(.*\)|\1|' | \ # Keep only the filenames
while read -r file; do git restore "$file"; done # Restore each file
Add the following to your .vscode/settings.json
:
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "always"
}
}