Last active
August 13, 2019 20:41
-
-
Save schwanksta/291973b16a10476ba7b6c9d942ebac56 to your computer and use it in GitHub Desktop.
Simple Django FOIA CCing code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from django.db.models import Q | |
from django.conf import settings | |
from django.utils.functional import cached_property | |
from django.contrib.postgres.fields import HStoreField, ArrayField | |
from datetime import date, datetime, time, timedelta | |
from foiaminder.schedule import SCHEDULES | |
from business_calendar import Calendar | |
import requests | |
class Schedule(models.Model): | |
date = models.DateField() | |
n = models.IntegerField() # Which number reminder email this is. | |
repeatable = models.BooleanField(default=False) | |
foia = models.ForeignKey('FOIA') | |
class Meta: | |
ordering = ('date',) | |
def send_reminder(self): | |
return requests.post( | |
"https://api.mailgun.net/v3/<MYDOMAIN>/messages", | |
auth=("api", settings.MAILGUN_API_KEY), | |
data={"from": "FOIA Minder <[email protected]>", | |
"to": [self.foia.sender], | |
"h:References": self.foia.primary_email.message_id, | |
"subject": "REMINDER: %s" % self.foia.display_subject(), | |
"text": "You have an action due on the data request mentioned above."}) | |
def push_forward(self): | |
""" | |
Pushes us forward by the every-after interval, so we can keep | |
on bird-dogging old requests. Never give up, never surrender. | |
""" | |
repeat = self.foia.get_schedule_data().get('every-after') | |
self.date = self.foia.add_days(self.date, repeat) | |
self.n += 1 | |
self.save() | |
class FOIA(models.Model): | |
#agency = models.CharField(max_length=255, null=True, blank=True) | |
sender = models.CharField(max_length=255) | |
subject = models.CharField(max_length=255) | |
body_html = models.TextField() | |
text = models.TextField() | |
is_active = models.BooleanField(default=True) | |
participants = ArrayField(models.CharField(max_length=255)) # The participants field of the initial email | |
type = models.CharField(max_length=255) | |
timestamp = models.DateTimeField() | |
closed_on = models.DateTimeField(null=True, blank=True) | |
notes = models.TextField() # this could be helpful | |
class Meta: | |
ordering = ('-timestamp',) | |
def next_date(self): | |
try: | |
return self.schedule_set.all()[0].date | |
except IndexError: | |
return None | |
@property | |
def primary_email(self): | |
return self.email_set.get(primary=True) | |
def close(self): | |
self.is_active = False | |
self.closed_on = datetime.now() | |
self.save() | |
self.schedule_set.all().delete() #kill any planned schedules. | |
def days_since(self): | |
if self.is_active: | |
dt = date.today() - self.timestamp.date() | |
else: | |
dt = self.closed_on.date() - self.timestamp.date() | |
return dt.days | |
def display_subject(self): | |
return self.subject.replace("Fwd: ", "").replace("Re: ", "") | |
def get_schedule_data(self): | |
return SCHEDULES.get(self.type) | |
def add_days(self, date, interval): | |
cal = Calendar() | |
sched = self.get_schedule_data() | |
if sched.get('business-days'): | |
return cal.addbusdays(date, interval) | |
else: | |
return date + timedelta(days=interval) | |
def to_schedule(self): | |
sched = self.get_schedule_data() | |
date = self.timestamp.date() | |
# If it's past 6pm, it's really the next day. | |
if self.timestamp.time() >= time(hour=18): | |
date += timedelta(days=1) | |
for i, interval in enumerate(sched.get('days')): | |
date = self.add_days(date, interval) | |
Schedule.objects.create(date=date, foia=self, n=i) | |
repeat = sched.get('every-after') | |
if repeat: | |
Schedule.objects.create(date = self.add_days(date, repeat), | |
foia = self, | |
n = i+1, | |
repeatable = True | |
) | |
def save(self, *args, **kwargs): | |
# If we haven't been saved, save and create a schedule. | |
if not self.id: | |
super(FOIA, self).save(*args, **kwargs) | |
self.to_schedule() | |
return | |
return super(FOIA, self).save(*args, **kwargs) | |
class Email(models.Model): | |
body_html = models.TextField() | |
text= models.TextField() | |
timestamp = models.DateTimeField() | |
sender = models.CharField(max_length=255) | |
recipient = models.CharField(max_length=255) | |
subject = models.CharField(max_length=255) | |
cc = models.TextField() | |
to = models.TextField() | |
bcc = models.TextField() | |
primary = models.BooleanField(default=False) | |
message_id = models.CharField(max_length=255, null=True, blank=True) | |
in_reply_to = models.CharField(max_length=255, null=True, blank=True) | |
references = models.CharField(max_length=255, null=True, blank=True) | |
participants = ArrayField(models.CharField(max_length=255)) | |
foia = models.ForeignKey(FOIA, null=True, blank=True) | |
@cached_property | |
def text_lower(self): | |
return self.text.lower() | |
def parse_hashtags(self): | |
return [i[1:] for i in self.text_lower.split() if i.startswith("#") ] | |
def get_schedule(self): | |
sched = self.recipient.split('@')[0].lower() | |
if not SCHEDULES.has_key(sched): | |
sched = 'default' | |
return sched | |
@property | |
def references_list(self): | |
try: | |
return self.references.split() | |
except: | |
return None | |
@property | |
def in_reply_to_list(self): | |
try: | |
return self.in_reply_to.split() | |
except: | |
return None | |
def save(self, *args, **kwargs): | |
if self.id: | |
return super(Email, self).save(*args, **kwargs) | |
if self.primary: | |
foia = FOIA.objects.create( | |
subject = self.subject, | |
body_html = self.body_html, | |
text = self.text, | |
participants = self.participants, | |
type = self.get_schedule(), | |
sender = self.sender, | |
timestamp = self.timestamp | |
) | |
else: | |
foia = FOIA.objects.get( | |
Q(email__message_id__in=self.references_list) | | |
Q(email__message_id__in=self.in_reply_to_list) | |
) | |
self.foia = foia | |
# Quick one to match 'closed.', 'closed!', "#closed it", "CLOSED" etc. | |
if any(['closed' in hashtag for hashtag in self.parse_hashtags()]) or 'closed' in self.text_lower.strip(): | |
foia.close() | |
return super(Email, self).save(*args, **kwargs) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SCHEDULES = { | |
'ny': { | |
'days': [5, 20, 30], | |
'every-after': 5, | |
'business-days': True | |
}, | |
'ca': { | |
'days': [10, 14], | |
'every-after': 7, | |
'business-days': False | |
}, | |
'foia': { | |
'days': [30], | |
'every-after': 14, | |
'business-days': False | |
}, | |
'default': { | |
'days': [30], | |
'every-after': 14, | |
'business-days': False | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.shortcuts import render | |
from django.http import HttpResponse | |
from django.db.models import Q | |
from datetime import datetime | |
from email import utils | |
from models import FOIA, Email, Schedule | |
from django.views.decorators.csrf import csrf_exempt | |
from pprint import pprint | |
from django.contrib.auth.decorators import login_required | |
def status(request): | |
return HttpResponse("OK") | |
@login_required(login_url='/login/google-oauth2') | |
def index(request): | |
foias = FOIA.objects.filter(sender=request.user.email) | |
# Sometimes you might wanna see it all.... | |
if request.GET.get('all') and '@mydomain.com' in request.user.email: | |
foias = FOIA.objects.all() | |
d = {'foias': foias} | |
return render(request, 'list.html', d) | |
# Normally Django will vomit a 403 if you get a POST without a CSRF token. | |
# Since we're getting these request externally, we need to mark the view exempt. | |
@csrf_exempt | |
def rcv(request): | |
pprint(dict(request.POST.iterlists())) | |
sender = request.POST.get('sender', '') | |
recipient = request.POST.get('recipient', '') | |
subject = request.POST.get('subject', '') | |
body_html = request.POST.get('body-html', '') | |
text = request.POST.get('stripped-text', '') | |
cc = request.POST.get('Cc', '') | |
to = request.POST.get('To', '') | |
bcc = request.POST.get('Bcc','') | |
file_count = request.POST.get('attachment-count', 0) | |
message_id = request.POST.get('Message-Id', '') | |
in_reply_to = request.POST.get('In-Reply-To', '') | |
references = request.POST.get('References', '') | |
timestamp = request.POST.get('timestamp') | |
participants = [a[1] for a in utils.getaddresses([cc] + [bcc] + [to]) if a[1] != '' and 'foiaminder' not in a[1] ] | |
if Email.objects.filter(message_id=message_id): | |
return HttpResponse("Duplicate") | |
primary = True | |
if Email.objects.filter( | |
Q(message_id__in=in_reply_to.split()) | | |
Q(message_id__in=references.split()) | |
): | |
primary = False | |
raw = dict(request.POST.iterlists()) | |
email = Email.objects.create( | |
body_html = body_html, | |
text = text, | |
timestamp = datetime.fromtimestamp(float(timestamp)), | |
sender = sender, | |
recipient = recipient, | |
subject = subject, | |
cc = cc, | |
to = to, | |
bcc = bcc, | |
message_id = message_id, | |
in_reply_to = in_reply_to or None, | |
references = references or None, | |
participants = participants, | |
primary = primary | |
#raw = raw | |
) | |
return HttpResponse("OK") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment