Skip to content

Instantly share code, notes, and snippets.

@yuvalif
Last active June 13, 2024 15:27
Show Gist options
  • Save yuvalif/b44a67b6278fe811aa38dd81a91eb3ba to your computer and use it in GitHub Desktop.
Save yuvalif/b44a67b6278fe811aa38dd81a91eb3ba to your computer and use it in GitHub Desktop.
# bucket notification parser according to the "pacific" formatting:
# https://github.com/ceph/ceph/blob/pacific/src/rgw/rgw_pubsub.h
# Note: JSON formatting does not match the one define at:
# https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
# but includes all data except: metadata, tags and opaque data
import struct
import sys
def prase_string(buff, offset):
size = struct.unpack_from('I', buff, offset)[0]
start = offset+4
end = start + size
return buff[start:end].decode('utf-8'), end
def prase_time(buff, offset):
sec = struct.unpack_from('I', buff, offset)[0]
nsec = struct.unpack_from('I', buff, offset+4)[0]
return str(sec+nsec/1000000), offset + 8
def prase_int64(buff, offset):
t = struct.unpack_from('Q', buff, offset)[0]
return str(t), offset + 8
if len(sys.argv) != 2:
print('usage: '+sys.argv[0]+' <queue file name>')
sys.exit(1)
fileName = sys.argv[1]
try:
file = open(fileName, mode='rb')
fileContent = file.read()
except OSError as err:
print('failed to open/read file. error: '+str(err))
sys.exit(1)
offset = 0
print('[')
firstTime = True
while True:
offset = fileContent.find(b'\x03\x00\x00\x002.2', offset+1)
if offset == -1:
print('}')
break
if not firstTime:
print('},')
else:
firstTime = False
print('{')
s, offset = prase_string(fileContent, offset)
print('"eventVersion":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"eventSource":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"awsRegion":'+'"'+s+'",')
s, offset = prase_time(fileContent, offset)
print('"eventTime":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"eventName":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"userIdentity":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"sourceIPAddress":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"x_amz_request_id":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"x_amz_id_2":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"s3SchemaVersion":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"configurationId":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"bucketName":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"bucketOwnerIdentity":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"bucketARN":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"objectKey":'+'"'+s+'",')
s, offset = prase_int64(fileContent, offset)
print('"objectSize":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"objectEtag":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"objectVersion":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"objectSequencer":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"eventId":'+'"'+s+'",')
s, offset = prase_string(fileContent, offset)
print('"bucketId":'+'"'+s+'"')
print(']')

Setup

  • have a running ceph cluster with an RGW (e.g. usign vstart):
MON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d
  • create a persistent topic that points to an invalid endpoint:
aws --endpoint-url http://localhost:8000 sns create-topic --name=fishtopic \
  --attributes='{"push-endpoint":  "kafka://localhost:9999", "persistent": "true"}'
  • create a bucket:
aws --endpoint-url http://localhost:8000 s3 mb s3://fish
  • define a notification on the bucket:
aws --endpoint-url http://localhost:8000 s3api put-bucket-notification-configuration --bucket fish \
  --notification-configuration='{"TopicConfigurations": [{"Id": "notif1", "TopicArn": "arn:aws:sns:default::fishtopic", "Events": []}]}'
  • upload objects to the bucket

With Containerized radosgw-admin

  • use the containerized radosgw-admin to dump the pending notifications from that topic:
podman run --mount type=bind,src=<vstart root dir>,dst=<vstart root dir> -it \ 
  quay.io/ylifshit/radosgw-admin radosgw-admin -c <vstart root dir>/ceph.conf topic dump --topic fishtopic

note that in the above we mount the directory where all the files pointed by ceph.conf reside. in a non-vstart deployment other directories should be mounted

  • the version of the radosgw-admin in the container was built off of aquid branch and tested with reef and quincy.

Without a radosgw-admin

  • get list of queue objects:
bin/rados -p default.rgw.log -N notif ls
  • dump queue object to file:
bin/rados -p default.rgw.log -N notif get <queue object name> <output file name>
  • use the below python script to parse the file:
python parse-notifications.py <output file name> | jq .
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment