Skip to content

Instantly share code, notes, and snippets.

@brydavis
Created August 3, 2019 08:26
Show Gist options
  • Save brydavis/0760fee5fdc2cc95430df47f32a0f587 to your computer and use it in GitHub Desktop.
Save brydavis/0760fee5fdc2cc95430df47f32a0f587 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
" from peewee import *\n",
"\n",
"database = SqliteDatabase('demo.db')\n",
"database.connect()\n",
"database.execute_sql('PRAGMA foreign_keys = ON;')\n",
"\n",
"class BaseModel(Model):\n",
" class Meta:\n",
" database = database\n",
"\n",
"\n",
"class Person(BaseModel):\n",
" \"\"\"\n",
" This class defines Person, which maintains details of someone\n",
" for whom we want to research career to date.\n",
" \"\"\"\n",
" person_name = CharField(primary_key = True, max_length = 30)\n",
" lives_in_town = CharField(max_length = 40)\n",
" nickname = CharField(max_length = 20, null = True)\n",
"\n",
"class Job(BaseModel):\n",
" \"\"\"\n",
" This class defines Job, which maintains details of past Jobs\n",
" held by a Person.\n",
" \"\"\"\n",
" job_name = CharField(primary_key = True, max_length = 30)\n",
" start_date = DateField(formats = 'YYYY-MM-DD')\n",
" end_date = DateField(formats = 'YYYY-MM-DD')\n",
" salary = DecimalField(max_digits = 7, decimal_places = 2)\n",
" person_employed = ForeignKeyField(Person, related_name='was_filled_by', null = False)\n",
"\n",
"class PersonNumKey(BaseModel):\n",
" \"\"\"\n",
" This class defines Person, which maintains details of someone\n",
" for whom we want to research career to date.\n",
" \"\"\"\n",
" person_name = CharField(max_length = 30)\n",
" lives_in_town = CharField(max_length = 40)\n",
" nickname = CharField(max_length = 20, null = True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"database.execute_sql('drop table if exists job;')\n",
"database.execute_sql('drop table if exists person;')\n",
"database.execute_sql('drop table if exists personnumkey;')\n",
"\n",
"Person.create_table()\n",
"Job.create_table()\n",
"PersonNumKey.create_table()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Database add successful\n",
"Database add successful\n",
"Database add successful\n",
"Database add successful\n",
"Database add successful\n",
"Read and print all Person records we created...\n",
"Andrew lives in Sumner and likes to be known as Andy\n",
"Peter lives in Seattle and likes to be known as None\n",
"Susan lives in Boston and likes to be known as Beannie\n",
"Pam lives in Coventry and likes to be known as PJ\n",
"Steven lives in Colchester and likes to be known as None\n"
]
}
],
"source": [
"# STEP 1: Add People\n",
"people = [\n",
" ('Andrew', 'Sumner', 'Andy'),\n",
" ('Peter', 'Seattle', None),\n",
" ('Susan', 'Boston', 'Beannie'),\n",
" ('Pam', 'Coventry', 'PJ'),\n",
" ('Steven', 'Colchester', None),\n",
" ]\n",
"\n",
"for person in people:\n",
" try:\n",
" with database.transaction():\n",
" new_person = Person.create(\n",
" person_name = person[0],\n",
" lives_in_town = person[1],\n",
" nickname = person[2])\n",
" new_person.save()\n",
" print('Database add successful')\n",
"\n",
" except Exception as e:\n",
" print(f'jError creating = {person[0]}')\n",
" print(e)\n",
" print('See how the database protects our data')\n",
"\n",
"print('Read and print all Person records we created...')\n",
"\n",
"for person in Person:\n",
" print(f'{person.person_name} lives in {person.lives_in_town} ' +\\\n",
" f'and likes to be known as {person.nickname}')"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Susan lives in Boston and likes to be known as Beannie\n",
"Peter does not have a nickname; see: None\n",
"Changing nickname for Peter\n",
"Update the database\n",
"Steven does not have a nickname; see: None\n",
"Not giving a nickname to Steven\n",
"Peter now has a nickname of Painter\n"
]
}
],
"source": [
"# STEP 2a: Select One Person\n",
"\n",
"aperson = Person.get(Person.person_name == 'Susan')\n",
"print(f'{aperson.person_name} lives in {aperson.lives_in_town} ' + \\\n",
" f' and likes to be known as {aperson.nickname}')\n",
"\n",
"# STEP 2b: Select and Update People Where Criteria Met\n",
"for person in Person.select().where(Person.nickname == None):\n",
" print(f'{person.person_name} does not have a nickname; see: {person.nickname}')\n",
" if person.person_name == 'Peter':\n",
" print('Changing nickname for Peter')\n",
" print('Update the database')\n",
" person.nickname = 'Painter'\n",
" person.save()\n",
" else:\n",
" print(f'Not giving a nickname to {person.person_name}')\n",
"\n",
"aperson = Person.get(Person.person_name == 'Peter')\n",
"print(f'{aperson.person_name} now has a nickname of {aperson.nickname}')\n",
"\n",
"\n",
"# database.close()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"We just created Fred, who lives in Seattle\n",
"Andrew lives in Sumner and likes to be known as Andy\n",
"Peter lives in Seattle and likes to be known as Painter\n",
"Susan lives in Boston and likes to be known as Beannie\n",
"Pam lives in Coventry and likes to be known as PJ\n",
"Steven lives in Colchester and likes to be known as None\n"
]
}
],
"source": [
"# STEP 3a: Add new person, then delete them\n",
"new_person = Person.create(\n",
" person_name = 'Fred',\n",
" lives_in_town = 'Seattle',\n",
" nickname = 'Fearless')\n",
"new_person.save()\n",
"\n",
"\n",
"# STEP 3b: Now delete them\n",
"aperson = Person.get(Person.person_name == 'Fred')\n",
"\n",
"print(f'We just created {aperson.person_name}, who lives in {aperson.lives_in_town}')\n",
"\n",
"aperson.delete_instance()\n",
"\n",
"# show all persons in database\n",
"for person in Person:\n",
" print(f'{person.person_name} lives in {person.lives_in_town} and likes to be known as {person.nickname}')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Analyst : 2001-09-22 to 2003-01-30 for Andrew\n",
"Senior analyst : 2003-02-01 to 2006-10-22 for Andrew\n",
"Senior business analyst : 2006-10-23 to 2016-12-24 for Andrew\n",
"Admin supervisor : 2012-10-01 to 2014-11,10 for Peter\n",
"Admin manager : 2014-11-14 to 2018-01,05 for Peter\n"
]
}
],
"source": [
"# STEP 4: Add jobs\n",
"jobs = [\n",
" ('Analyst', '2001-09-22', '2003-01-30',65500, 'Andrew'),\n",
" ('Senior analyst', '2003-02-01', '2006-10-22', 70000, 'Andrew'),\n",
" ('Senior business analyst', '2006-10-23', '2016-12-24', 80000, 'Andrew'),\n",
" ('Admin supervisor', '2012-10-01', '2014-11,10', 45900, 'Peter'),\n",
" ('Admin manager', '2014-11-14', '2018-01,05', 45900, 'Peter'),\n",
"]\n",
"\n",
"for job in jobs:\n",
" try:\n",
" with database.transaction():\n",
" new_job = Job.create(\n",
" job_name = job[0],\n",
" start_date = job[1],\n",
" end_date = job[2],\n",
" salary = job[3],\n",
" person_employed = job[4])\n",
" new_job.save()\n",
"\n",
" except Exception as e:\n",
" print(f'Error creating = {job[0]}')\n",
" print(e)\n",
"\n",
"\n",
"# let's see all jobs in database\n",
"for job in Job:\n",
" print(f'{job.job_name} : {job.start_date} to {job.end_date} for {job.person_employed}')\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Person Andrew had job Analyst\n",
"Person Andrew had job Senior analyst\n",
"Person Andrew had job Senior business analyst\n",
"Person Peter had job Admin supervisor\n",
"Person Peter had job Admin manager\n"
]
}
],
"source": [
"# STEP 5: INNER JOIN reading database\n",
"\n",
"query = (Person\n",
" .select(Person, Job)\n",
" .join(Job, JOIN.INNER)\n",
" )\n",
"\n",
"# View matching records from both tables\n",
"for person in query:\n",
" print(f'Person {person.person_name} had job {person.job.job_name}')\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Person Andrew had job Analyst\n",
"Person Andrew had job Senior analyst\n",
"Person Andrew had job Senior business analyst\n",
"Person Peter had job Admin supervisor\n",
"Person Peter had job Admin manager\n",
"Person Susan had no job\n",
"Person Pam had no job\n",
"Person Steven had no job\n",
"Example of how to summarize data\n",
"Note select() creates a count and names it job_count\n",
"group_by and order_by control level and sorting\n",
"Andrew had 3 jobs\n",
"Pam had 0 jobs\n",
"Peter had 2 jobs\n",
"Steven had 0 jobs\n",
"Susan had 0 jobs\n"
]
}
],
"source": [
"# STEP 6: OUTER JOIN reading database\n",
"\n",
"# View matching records and Persons without Jobs (note LEFT_OUTER)\n",
"query = (\n",
" Person\n",
" .select(Person, Job)\n",
" .join(Job, JOIN.LEFT_OUTER)\n",
")\n",
"\n",
"for person in query:\n",
" try:\n",
" print(f'Person {person.person_name} had job {person.job.job_name}')\n",
"\n",
" except Exception as e:\n",
" print(f'Person {person.person_name} had no job')\n",
"\n",
"print('Example of how to summarize data')\n",
"print('Note select() creates a count and names it job_count')\n",
"print('group_by and order_by control level and sorting')\n",
"\n",
"query = (Person\n",
" .select(Person, fn.COUNT(Job.job_name).alias('job_count'))\n",
" .join(Job, JOIN.LEFT_OUTER)\n",
" .group_by(Person)\n",
" .order_by(Person.person_name))\n",
"\n",
"for person in query:\n",
" print(f'{person.person_name} had {person.job_count} jobs')\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Add failed because Harry is not in Person\n",
"For Job create: Sales\n",
"FOREIGN KEY constraint failed\n",
"Trying to delete Andrew who lives in Sumner\n",
"Delete failed because Andrew has Jobs\n",
"Delete failed: Andrew\n",
"FOREIGN KEY constraint failed\n"
]
}
],
"source": [
"# STEP 6a: Try to add a new job where a person doesnt exist...\n",
"\n",
"addjob = ('Sales', '2010-04-01', '2018-02-08', 80400, 'Harry')\n",
"\n",
"# Adding a sales job for Harry')\n",
"try:\n",
" with database.transaction():\n",
" new_job = Job.create(\n",
" job_name = addjob[0],\n",
" start_date = addjob[1],\n",
" end_date = addjob[2],\n",
" salary = addjob[3],\n",
" person_employed = addjob[4])\n",
" new_job.save()\n",
"except Exception as e:\n",
" print('Add failed because Harry is not in Person')\n",
" print(f'For Job create: {addjob[0]}')\n",
" print(e)\n",
"\n",
"# STEP 6a: Try to Delete a person who has jobs...\n",
"try:\n",
" with database.transaction():\n",
" aperson = Person.get(Person.person_name == 'Andrew')\n",
" print(f'Trying to delete {aperson.person_name} who lives in {aperson.lives_in_town}')\n",
" aperson.delete_instance()\n",
"\n",
"except Exception as e:\n",
" print('Delete failed because Andrew has Jobs')\n",
" print(f'Delete failed: {aperson.person_name}')\n",
" print(e)\n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Error creating = Andrew\n",
"UNIQUE constraint failed: person.person_name\n",
"Error creating = Peter\n",
"UNIQUE constraint failed: person.person_name\n",
"Error creating = Susan\n",
"UNIQUE constraint failed: person.person_name\n",
"Error creating = Pam\n",
"UNIQUE constraint failed: person.person_name\n",
"Error creating = Steven\n",
"UNIQUE constraint failed: person.person_name\n",
"Name : Andrew with id: 1\n",
"Name : Peter with id: 2\n",
"Name : Susan with id: 3\n",
"Name : Pam with id: 4\n",
"Name : Steven with id: 5\n",
"Name : Andrew with id: 6\n",
"Name : Peter with id: 7\n",
"Name : Susan with id: 8\n",
"Name : Pam with id: 9\n",
"Name : Steven with id: 10\n"
]
}
],
"source": [
"\n",
"# logger.info('Working with Person class')\n",
"# logger.info('Note how I use constants and a list of tuples as a simple schema')\n",
"# logger.info('Normally you probably will have prompted for this from a user')\n",
"\n",
"PERSON_NAME = 0\n",
"LIVES_IN_TOWN = 1\n",
"NICKNAME = 2\n",
"\n",
"people = [\n",
" ('Andrew', 'Sumner', 'Andy'),\n",
" ('Peter', 'Seattle', None),\n",
" ('Susan', 'Boston', 'Beannie'),\n",
" ('Pam', 'Coventry', 'PJ'),\n",
" ('Steven', 'Colchester', None),\n",
" ]\n",
"\n",
"# logger.info('Try creating Person records again: it will fail')\n",
"\n",
"for person in people:\n",
" try:\n",
" with database.transaction():\n",
" new_person = Person.create(\n",
" person_name = person[PERSON_NAME],\n",
" lives_in_town = person[LIVES_IN_TOWN],\n",
" nickname = person[NICKNAME])\n",
" new_person.save()\n",
" print('Database add successful')\n",
"\n",
" except Exception as e:\n",
" print(f'Error creating = {person[PERSON_NAME]}')\n",
" print(e)\n",
"\n",
"# logger.info('We make sure duplicates are not unintentionally created this way')\n",
"# logger.info('BUT: how do we really identify a Person (uniquely)?')\n",
"\n",
"# logger.info('Creating Person records, but in a new table with generated PK...')\n",
"for person in people:\n",
" try:\n",
" with database.transaction():\n",
" new_person = PersonNumKey.create(\n",
" person_name = person[PERSON_NAME],\n",
" lives_in_town = person[LIVES_IN_TOWN],\n",
" nickname = person[NICKNAME])\n",
" new_person.save()\n",
"\n",
" except Exception as e:\n",
" print(f'Error creating = {person[0]}')\n",
" print(e)\n",
"\n",
"# logger.info('Watch what happens when we do it again')\n",
"\n",
"for person in people:\n",
" try:\n",
" with database.transaction():\n",
" new_person = PersonNumKey.create(\n",
" person_name = person[PERSON_NAME],\n",
" lives_in_town = person[LIVES_IN_TOWN],\n",
" nickname = person[NICKNAME])\n",
" new_person.save()\n",
"\n",
" except Exception as e:\n",
" print(f'Error creating = {person[0]}')\n",
" print(e)\n",
"\n",
"# logger.info('Note no PK specified, no PK violation; \"duplicates\" created!')\n",
"\n",
"for person in PersonNumKey.select():\n",
" print(f'Name : {person.person_name} with id: {person.id}')\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Tried to change Peter to Peta\n",
"Looked for Peter: found! -> Peter\n",
"Looking for Peta results in zero records. PK changes are ignored and do not throw an error!!!!\n",
"Cant change a PK\n",
"PK \"change\" can only be achieved with a delete and new create\n"
]
}
],
"source": [
"\n",
"# logger.info(\"Back to Person class: try to change Peter's name\")\n",
"\n",
"aperson = Person.get(Person.person_name == 'Peter')\n",
"# logger.info(f'Current value is {aperson.person_name}')\n",
"\n",
"# logger.info('Update Peter to Peta, thereby trying to change the PK...')\n",
"\n",
"try:\n",
" with database.transaction():\n",
" aperson = Person.get(Person.person_name == 'Peter')\n",
" aperson.person_name = 'Peta'\n",
" aperson.save()\n",
" print(f'Tried to change Peter to {aperson.person_name}')\n",
"\n",
"except Exception as e:\n",
" print(f'Cant change a PK and caught you trying') # not caught; no error thrown by Peewee\n",
" print(e)\n",
"\n",
"aperson = Person.get(Person.person_name == 'Peter')\n",
"print(f'Looked for Peter: found! -> {aperson.person_name}')\n",
"\n",
"try:\n",
" aperson = Person.get(Person.person_name == 'Peta')\n",
"\n",
"except Exception as e:\n",
" print(f'Looking for Peta results in zero records. PK changes are ignored and do not throw an error!!!!')\n",
" print(f'Cant change a PK')\n",
" print('PK \"change\" can only be achieved with a delete and new create')\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Close the database when done\n",
"database.close()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"# non-ORM way to interact with database\n",
"\n",
"import pyodbc\n",
"import sqlite3\n"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(1, 2)\n"
]
}
],
"source": [
"with sqlite3.connect(\"demo.db\") as conn:\n",
" cursor = conn.execute(\"select 1 as one, 2 as two\")\n",
" for row in cursor.fetchall():\n",
" print(row)\n",
"# print({\n",
"# cursor.description[i][0]: value \n",
"# for i, value in enumerate(row)\n",
"# })"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"with sqlite3.connect(\"demo.db\") as conn:\n",
" cursor = conn.execute(\"\"\"\n",
" select *\n",
" from person\n",
" \"\"\")\n",
" \n",
" people = []\n",
" for row in cursor.fetchall():\n",
"# print(list(row))\n",
" people.append({\n",
" \"name\": row[0],\n",
" \"city\": row[1],\n",
" \"nickname\": row[2],\n",
" })\n",
"\n",
" print(people)"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Person Andrew had job Analyst\n",
"Person Andrew had job Senior analyst\n",
"Person Andrew had job Senior business analyst\n",
"Person Peter had job Admin supervisor\n",
"Person Peter had job Admin manager\n"
]
}
],
"source": [
"# STEP 5: INNER JOIN reading database\n",
"\n",
"query = (\n",
" Person\n",
" .select(Person, Job)\n",
" .join(Job, JOIN.INNER)\n",
")\n",
"\n",
"# View matching records from both tables\n",
"for person in query:\n",
" print(f'Person {person.person_name} had job {person.job.job_name}')\n"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Person Andrew had job Analyst\n",
"Person Andrew had job Senior analyst\n",
"Person Andrew had job Senior business analyst\n",
"Person Peter had job Admin supervisor\n",
"Person Peter had job Admin manager\n"
]
}
],
"source": [
"with sqlite3.connect(\"demo.db\") as conn:\n",
" cursor = conn.execute(\"\"\"\n",
" select *\n",
" from \n",
" person as p\n",
" inner join job as j\n",
" on j.person_employed_id = p.person_name \n",
" \"\"\")\n",
" \n",
" for row in cursor.fetchall():\n",
" row = {\n",
" cursor.description[i][0]: value \n",
" for i, value in enumerate(row)\n",
" }\n",
" \n",
" print('Person {person_name} had job {job_name}'.format(\n",
" person_name=row[\"person_name\"],\n",
" job_name=row[\"job_name\"],\n",
" ))\n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"('Peter', 2)\n",
"('Steven', 0)\n",
"('Susan', 0)\n"
]
}
],
"source": [
"with sqlite3.connect(\"demo.db\") as conn:\n",
" cursor = conn.execute(\"\"\"\n",
" select \n",
" p.person_name\n",
" , count(j.person_employed_id) as number_of_positions\n",
" from \n",
" person as p\n",
" left join job as j\n",
" on j.person_employed_id = p.person_name \n",
" where p.person_name not in (\"Andrew\", \"Sally\", \"Pam\")\n",
" group by p.person_name\n",
" \"\"\")\n",
" \n",
" for row in cursor.fetchall():\n",
" print(row)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment