- Ansible
- Athena
- Docker
- EC2
- jq
- MongoDB
- Python (General)
- Pandas
- Pytest tricks
- Plotly
- Poetry
- R
- S3 CLI
- Spark
- Terminal hacks
- VScode
def create_or_update_glue_crawler_tags(connection, module, glue_crawler, changed):
account_id = module.client('sts').get_caller_identity().get('Account')
region = connection.meta.region_name
glue_crawler_arn = (
"arn:aws:glue:{region}:{account_id}:crawler/{glue_crawler}"
.format(region=region, account_id=account_id, glue_crawler=glue_crawler["Name"])
)
current_tags = connection.get_tags(ResourceArn=glue_crawler_arn)["Tags"]
if module.params.get("tags") is None and current_tags == dict():
# No current tags and no tags provided in task
return changed
user_tags = module.params.get("tags")
if user_tags == current_tags:
# Current tags are the same as those provided in the task
return changed
# Current tags are different from those provided
# First, remove all existing tags
try:
connection.untag_resource(ResourceArn=glue_crawler_arn, TagsToRemove=list(current_tags.keys()))
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, "Unable to remove existing tags")
# Now set the new tags
if user_tags is None:
return True
try:
connection.tag_resource(
ResourceArn=glue_crawler_arn,
TagsToAdd=user_tags
)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, "Unable to tag crawler")
return Truewhere:
module = AnsibleAWSModule(...)connection = module.client('glue')glue_crawleris the crawler name
- You data is stored on S3 with the following structure:
s3://my_bucket/base/path/dt=YYYY-MM-DD/ - Each partition contains JSON files
IMPORTANT: If you're trying to change something in the schema, don't forget to DROP the table first!
CREATE EXTERNAL TABLE IF NOT EXISTS `db_name.table_name` (
`field1` string,
`field2` string,
`field3` float,
`field4` int,
`field5` int,
`field6` string,
`field7` string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://my_bucket/base/path/'
TBLPROPERTIES ('has_encrypted_data'='true');MSCK REPAIR TABLE `db_name.table_name`SELECT *
FROM db_name.table_name
WHERE dt > '2020-03-03'
LIMIT 5;FROM python:3.7.3-alpine3.9
# Install dependencies needed for building of pandas+numpy
# See: https://stackoverflow.com/a/38571314/671013
RUN apk --update add --no-cache \
lapack-dev \
gcc \
freetype-dev
RUN apk add --no-cache --virtual .build-deps \
gfortran \
musl-dev \
g++
RUN ln -s /usr/include/locale.h /usr/include/xlocale.h
# Install python dependencies. Needed to run
RUN pip3 install --no-cache-dir Cython
RUN pip3 install --no-cache-dir numpy
RUN pip3 install --no-cache-dir pandas
SSD provided for a spawned EC2 instance are not mounted automatically. Checkout this answer.
touch res.csv && echo "col1,col2,col3" > res.csv && cat input.json | jq -r '[.col1, .col2, .col3] | @csv' >> res.csvThis is correct, assuming that input.json has the form:
{"col1": 10, "col2": "foo", "col3": true}
{"col1": 11, "col2": "boo", "col3": false}
{"col1": 11, "col2": "goo", "col3": true}
cat data.json | jq -c '. | {col1: .col1 | .[0:10], col2: .col2, nested1: .nest.foo, nested2: .nest.bar}' > flatten.jsonIn the example above, the slice [0:10] is kept from col1.
db.getCollection('collectionName').find(
{
userId: {$eq: "someID_123"},
contentType: {$eq: "THE_CONTENT_TYPE"},
"content.some.nestedField": {$exists: 1},
createdDate: {$gt: ISODate("2019-05-08")}
}
)db.getCollection('myCollection').aggregate([
{
"$project": {
"dow": {"$dayOfWeek": "$publishDate"}
}
},
{
"$group": {
"_id": "$dow" ,
"count": { "$sum": 1 }
}
},
{
"$project": {
"dowName": {
"$cond": [ {"$eq":[1,"$_id"]}, "Sun", {
"$cond": [ {"$eq":[2,"$_id"]}, "Mon", {
"$cond": [ {"$eq":[3,"$_id"]}, "Tue", {
"$cond": [ {"$eq":[4,"$_id"]}, "Wed", {
"$cond": [ {"$eq":[5,"$_id"]}, "Thu", {
"$cond": [ {"$eq":[6,"$_id"]}, "Fri", "Sat"]
}]
}]
}]
}]
}]
},
"count": "$count"
}
},
{ "$sort" : { "_id" : 1 } }
])import datetime
import jsonlines
import pymongo
def insert_data_to_db(filename: str):
"""Insert data (one JSON per line) to DB from a file
Each data point is enriched with the timestamp of insertion.
DB's location is provied by an environment variable
Parameters
----------
filename : str
Path to a json-lines files where each line corresponds to a valid
JSON of the data
"""
LOGGER.info(f"About to upload the data in {filename} to DB")
app_db_uri = os.getenv("APP_DB_URI", "mongodb://localhost:27017/")
client = pymongo.MongoClient(app_db_uri)
db = client["myDB"]
collection = db["myCollection"]
with jsonlines.open(filename) as reader:
for obj in reader:
obj["inserted_ts"] = datetime.datetime.utcnow()
collection.insert(obj)
client.close()Create the following utils:
class SlackIntegration(BaseSettings):
model_config = SettingsConfigDict(env_prefix="DEALFLOW_SLACK_")
web_hook_url: HttpUrl
def send_message(
message: str, slack_integration: SlackIntegration | None = None
) -> int:
if slack_integration is None:
slack_integration = SlackIntegration()
# Define the payload
payload = {"text": message}
# Define the URL
url = slack_integration.web_hook_url
# Define headers
headers = {"Content-type": "application/json"}
# Make the POST request
response = requests.post(url, data=json.dumps(payload), headers=headers)
return response.status_codeNext, in your program/module/etc. you can use something like:
import sys
from shared.infra.slack import send_message, SlackIntegration
slackintegration = SlackIntegration()
def my_except_hook(exctype, value, traceback):
send_message(f"Error: {value}", slack_integration=slackintegration)
sys.__excepthook__(exctype, value, traceback)
sys.excepthook = my_except_hook
1 / 0Let us start from this dummy frame:
df = pd.DataFrame(
[
["a", "2020-12-20", 10],
["a", "2020-12-26", 11],
["a", "2020-12-22", 10],
["b", "2020-12-25", 111],
["c", "2020-12-20", 20],
["d", "2020-12-05", 1111]
],
columns=["cat", "date", "value"]
)
df["date"] = pd.to_datetime(df.date)The objective is to get the weekly sum of values per category
We start with:
res0 = (
df
.set_index("date")
.groupby("cat")
.resample("W")["value"].sum()
)which yields:
cat date
a 2020-12-20 10
2020-12-27 21
b 2020-12-27 111
c 2020-12-20 20
d 2020-12-06 1111
But this is not good enough.
For the b category we only have one value and we cannot compare it to the weeks where a has values.
Let's fill the missing weeks:
res1 = (
res0
.unstack()
.fillna(0)
.stack()
.reset_index()
)which yields:
| cat | date | 0 |
|---|---|---|
| a | 2020-12-06 00:00:00 | 0 |
| a | 2020-12-20 00:00:00 | 10 |
| a | 2020-12-27 00:00:00 | 21 |
| b | 2020-12-06 00:00:00 | 0 |
| b | 2020-12-20 00:00:00 | 0 |
| b | 2020-12-27 00:00:00 | 111 |
| c | 2020-12-06 00:00:00 | 0 |
| c | 2020-12-20 00:00:00 | 20 |
| c | 2020-12-27 00:00:00 | 0 |
| d | 2020-12-06 00:00:00 | 1111 |
| d | 2020-12-20 00:00:00 | 0 |
| d | 2020-12-27 00:00:00 | 0 |
Now we have aggregations for all the weeks where at least one category had some value.
But what about the week of 2020-12-13?
No category was active, but it's still interesting...
To nail that we'd need an intermidate step:
res2 = (
df
.set_index("date")
.groupby("cat")
.resample("W")["value"].sum()
.unstack()
.transpose()
)yielding:
| date | a | b | c | d |
|---|---|---|---|---|
| 2020-12-06 00:00:00 | nan | nan | nan | 1111 |
| 2020-12-20 00:00:00 | 10 | nan | 20 | nan |
| 2020-12-27 00:00:00 | 21 | 111 | nan | nan |
Next, we re-index the frame and include the missing week:
res3 = (
res2
.reindex(
pd.date_range(
res2.index.min(),
res2.index.max(),
freq="W", # This is important!!!
name="date"
))
.transpose()
.fillna(0)
.stack()
.reset_index()
.sort_values("date")
)yielding:
| cat | date | 0 |
|---|---|---|
| a | 2020-12-06 00:00:00 | 0 |
| b | 2020-12-06 00:00:00 | 0 |
| c | 2020-12-06 00:00:00 | 0 |
| d | 2020-12-06 00:00:00 | 1111 |
| a | 2020-12-13 00:00:00 | 0 |
| b | 2020-12-13 00:00:00 | 0 |
| c | 2020-12-13 00:00:00 | 0 |
| d | 2020-12-13 00:00:00 | 0 |
| a | 2020-12-20 00:00:00 | 10 |
| b | 2020-12-20 00:00:00 | 0 |
| c | 2020-12-20 00:00:00 | 20 |
| d | 2020-12-20 00:00:00 | 0 |
| a | 2020-12-27 00:00:00 | 21 |
| b | 2020-12-27 00:00:00 | 111 |
| c | 2020-12-27 00:00:00 | 0 |
| d | 2020-12-27 00:00:00 | 0 |
Note that for 2020-12-13 all values are zero!
Generate dummy data (taken from here)
Hidden way down in Pandas’ testing module are a number of convenient functions for quickly building quasi-realistic Series and DataFrames:
import pandas.util.testing as tm
tm.N, tm.K = 15, 3 # Module-level default rows/columns
import numpy as np
np.random.seed(444)
tm.makeTimeDataFrame(freq='M').head()
# A B C
# 2000-01-31 0.3574 -0.8804 0.2669
# 2000-02-29 0.3775 0.1526 -0.4803
# 2000-03-31 1.3823 0.2503 0.3008
# 2000-04-30 1.1755 0.0785 -0.1791
# 2000-05-31 -0.9393 -0.9039 1.1837
tm.makeDataFrame().head()
# A B C
# nTLGGTiRHF -0.6228 0.6459 0.1251
# WPBRn9jtsR -0.3187 -0.8091 1.1501
# 7B3wWfvuDA -1.9872 -1.0795 0.2987
# yJ0BTjehH1 0.8802 0.7403 -1.2154
# 0luaYUYvy1 -0.9320 1.2912 -0.2907Assume you have a series:
s = pd.Series(["foo", "bar", 1, 2])and you want to map foo to 123 and 1 to yoyo.
First, define the map as a dictionary:
map_dict = {
"foo": 123,
1: "yoyo"
}Now you can try:
s.map(map_dict)This yields:
0 123
1 NaN
2 yoyo
3 NaN
dtype: object
and introduces NaN's where-ever the value is not a key of the mapping.
If you want to keep the values in s that are missing from the map, define the following class:
class DictPreserveMissingKeys(dict):
def __init__(self, *arg, **kw):
super(DictPreserveMissingKeys, self).__init__(*arg, **kw)
def __missing__(self, key):
return keyand run:
s.map(DictPreserveMissingKeys(map_dict))which yields:
0 123
1 bar
2 yoyo
3 2
dtype: object
Here is a minimal example:
import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3
@mock_s3
def test_single_file():
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket="my_bucket")
s3 = S3FileSystem()
my_dict = {"foo": "bar"}
with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
json.dump(my_dict, f)
with tempfile.TemporaryDirectory() as tmp_dir:
copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
with open(f"{tmp_dir}/foo.json", "r") as f:
assert json.load(f) == my_dictAlternatively, and especially useful if use need to use the same bucket in many tests, you can use a fixture:
import pytest
import tempfile
from s3fs import S3FileSystem
import boto3
import json
from moto import mock_s3
@pytest.yield_fixture(scope="function")
def s3():
mocks3 = mock_s3()
mocks3.start()
client = boto3.client("s3")
client.create_bucket(Bucket="my_bucket")
s3 = S3FileSystem()
yield s3
mocks3.stop()
def test_single_file(s3):
my_dict = {"foo": "bar"}
with s3.open("s3://my_bucket/some/dir/foo.json", "w") as f:
json.dump(my_dict, f)
with tempfile.TemporaryDirectory() as tmp_dir:
copy_artifact("s3://my_bucket/some/dir/foo.json", tmp_dir)
with open(f"{tmp_dir}/foo.json", "r") as f:
assert json.load(f) == my_dictSee my answer here.
import pandas as pd
import numpy as np
pd.options.plotting.backend = "plotly"
df = pd.DataFrame(
{
"val": np.sin(np.linspace(0, 7, 100))
}
)
df["signal"] = df.val > 0.7
fig = df.plot(y="val")
for idx in df[df.signal].index:
fig.add_vline(idx)import pandas as pd
pd.options.plotting.backend = "plotly"
import plotly.graph_objects as go
go.Figure({
'data': my_df.plot().data,
'layout': {
'hovermode': 'x',
'xaxis': {'showspikes': True}
}
})story_ids = df.story_id.unique()
fig = make_subplots(
rows=story_ids.shape[0], cols=1,
subplot_titles=[f"Story ID: {story_id}" for story_id in story_ids]
)
for i, story_id in enumerate(story_ids):
fig.add_trace(
px.bar(
df, x="foo", y="bar", title=f"Title ID: {story_id}"
).data[0],
row=i+1, col=1
)
fig.update_layout(height=2000, width=1000, title_text="Chapter Retention Rate per Story")
fig- Create a virtual environment for your installation:
python -m venv ~/.poetry. In this case, it will be created in the home directory. - Next, update
pipandsetuptoolsas follows:C:\Users\<your-username>\.poetry\Scripts\python.exe -m pip install -U pip setuptools - Now, you can install
poetry:C:\Users\<your-username>\.poetry\Scripts\pip.exe install poetry. - Lastly, it's time to add
poetryto your path; add at the end of yourPATHlist the following:C:\Users\<your-username>\.poetry\Scripts. - Test the installation, by restarting your console and running:
poetry config -vvv --list. This will also hint to you wherepoetryis looking for its config file(s).
NOTE: Due to slow network connection, I had to manually tweek the time out constant.
This can be done by editing C:\Users\<your-username>\.poetry\.\Lib\site-packages\poetry\utils\constants.py.
I increased it to 1500.
I needed that when, for example installing mypy.
Check out this discord message.
Thanks Clinton!
By first checking that all the NAs are aligned, you can then switch to the other values and make sure they are all equal.
elementwise_equal <- function(a, b) {
expect_equal(length(a), length(b))
# All NAs appear in the same places
expect_equal(length(which((is.na(a) == is.na(b)) == FALSE)), 0)
expect_equal(
length(which((a == b) == T)) + length(which(is.na(a == b))),
length(a)
)
}See: jeroen/jsonlite#59 (comment)
jsonlite::stream_in(file("path/to/my/file.json"))read_gzip_json_from_s3_to_df <- function(path) {
#' Read a single gzipeed JSON file from S3 location into a dataframe
#'
#' The compressed JSON should contain a single object per line
#' with no commas of array structure wrapping the objects
#'
#' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json.gz
raw_data <- path %>% get_object %>% rawConnection %>% gzcon %>% jsonlite::stream_in() %>% jsonlite::flatten()
raw_data
}read_json_from_s3_to_df <- function(path) {
#' Read a single JSON file from S3 location into a dataframe
#'
#' The JSON should contain a single object per line
#' with no commas of array structure wrapping the objects
#'
#' @param path S3 location of an object; e.g. s3://my-bucket/some/folders/file.json
raw_data <- path %>% get_object %>% rawToChar %>% textConnection %>% jsonlite::stream_in() %>% jsonlite::flatten()
raw_data
}zoneId_to_offset_in_seconds <- function(zoneID) {
#' Given a ZoneID (see the Java definition), return an offset as a signed integer
#'
#' @param zoneID a string representing a zone ID. Should have the form "+HH:MM or "-HH:MM".
#' May be a single string or a list(?) of strings
tz_sign <- zoneID %>% substr(1,1) %>% paste0(1) %>% as.numeric()
hours <- zoneID %>% substr(2,3) %>% as.numeric()
minutes <- zoneID %>% substr(5,6) %>% as.numeric()
tz_sign * (hours * 3600 + minutes * 60)
}aws s3 cp s3://my_bucket/the/prefix/with/trailing/ . --recursive --exclude "*" --include "dt=2020-01-*"ds.select("col_name").collectAsList();
df.select("col_name").collectAsList();df.select("col_name").distinct().collect()df = sqlContext.read.json('s3://my_bucket/raw/items/dt=2019-04-*')# Assuming df is some dataframe/dataset
df.createOrReplaceTempView("data")
spark.sql("SELECT * FROM data WHERE userId = 'foobar'")See SO
from pyspark.sql.functions import col, countDistinct
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))The following links helped when figuring out this solution:
- https://stackoverflow.com/a/55590767/671013 (PySpark)
- https://stackoverflow.com/a/47139522/671013 (Scala)
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))Caveat: The mentioned solution requires the specification of the bucket and it is not a generic solution for all S3 paths.
Consider the following dataframe:
df = SQLContext(context).createDataFrame(
[
('1997-02-28 10:30:45.123',),
('1997-02-28 10:30:45',),
("1997-02-28 10:30",),
("1997-02-28 10",),
("1997-02-28",),
], ['t'])
df.show()Now, compare the results of the following:
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss.SSS').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH:mm').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd HH').alias('date')).show()
df.select(F.to_date(df.t, 'yyyy-MM-dd').alias('date')).show()+----------+
| date|
+----------+
|1997-02-28|
| null|
| null|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
| null|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
| null|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
| null|
+----------+
+----------+
| date|
+----------+
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
|1997-02-28|
+----------+
I created this small notebook here with a demo how PySpark behaves with timezones. This blog post is also interesting.
- See: https://gist.github.com/drorata/23feb478e5723ca7074e79dec630c171
- and: https://gist.github.com/drorata/6f96fccd7735daa4179fa5099c739aaa
Assuming your environment has pyspark installed and you know where your java is, the following fixture will provide a sqlContext which you could use in other tests.
@pytest.fixture
def sqlC():
# For this to work I had to tweak:
# $ export JAVA_HOME="$(/usr/libexec/java_home --version 1.8)"
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
return SQLContext(sc)For example:
def test_foo(sqlC):
assert TrueIf you have to restore a bunch of dot files (e.g., .gitignore), then git restore * won't work for you.
You can use the following:
git status -s | \ # Get the list of files
sed 's|^ M \(.*\)|\1|' | \ # Keep only the filenames
while read -r file; do git restore "$file"; done # Restore each fileAdd the following to your .vscode/settings.json:
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "always"
}
}