Skip to content

Instantly share code, notes, and snippets.

@zhehaowang
Last active March 8, 2016 22:36
Show Gist options
  • Save zhehaowang/00ba52c405bf2f4b5e71 to your computer and use it in GitHub Desktop.
Save zhehaowang/00ba52c405bf2f4b5e71 to your computer and use it in GitHub Desktop.
Test for NAC in CCL (PyNDN)
Test for NAC in CCL (PyNDN)

What to expect:

For example,

  • Consumer names:

    • Certificate name: /ndn/member0/KEY/ksk-1456805664/ID-CERT/%FD%00%00%01S0bVr
    • Key name: /ndn/member0/ksk-1456805664
    • Group name: /prefix/READ/a
    • Tries to consume: /prefix/SAMPLE/a/b/c/20150825T080000
  • Consumer actions and knowledge:

    • Publish its own certificate
    • Consume data (issue interest for data name, gets encrypted data name, asks for encrypted C-key, asks for encrypted D-key to decrypt the C-key (TODO: revise this), consults local database to decrypt the D-key)
    • Knows the group name it's in
  • Producer:

    • Produced data name: /prefix/SAMPLE/a/b/c/20150825T080000/FOR/prefix/SAMPLE/a/b/c/C-KEY/20150825T080000
    • C-key name: /prefix/SAMPLE/a/b/c/C-KEY/20150825T080000
  • Producer actions and knowledge:

    • Produce and publish data
    • Try to fetch keys from all group whose name is a prefix of the producer
    • Encrypts C-key with the fetched group public key, and publishes encrypted C-key
  • Group manager:

    • Group name: /prefix/SAMPLE/a (prefix = "/prefix", dataType = "a")
    • Group's public key: /prefix/READ/a/E-KEY/20150825T050000/20150825T100000
    • Group's encrypted D-key for a consumer: /prefix/READ/a/D-KEY/20150825T050000/20150825T100000/FOR/ndn/member0/ksk-1456805664
  • Group manager actions and knowledge:

    • Adds member with the certificate name "/ndn/member0/KEY/ksk-1456805664/ID-CERT/%FD%00%00%01S0bVr"
    • Publish the group's public key, generate E-key and D-key for its member, and publish encrypted D-key for each member

Testing

DPU do bounding box

Group encrypts its private key using users' E-key, and encrypts users' D-key with user's public key

So user gets the encrypted D-key, fetches the group's private key encrypted for himself, and decrypts the encrypted C-key published by producer; which means the description above's not accurate

import unittest as ut
import os, time, base64
from pyndn import Name, Data, Face
from pyndn.util import Blob, MemoryContentCache
from pyndn.encrypt import Consumer, Sqlite3ConsumerDb, EncryptedContent
from pyndn.security import KeyType, KeyChain, RsaKeyParams
from pyndn.security.certificate import IdentityCertificate
from pyndn.security.identity import IdentityManager
from pyndn.security.identity import BasicIdentityStorage, FilePrivateKeyStorage
from pyndn.security.policy import NoVerifyPolicyManager
DATA_CONTENT = bytearray([
0xcb, 0xe5, 0x6a, 0x80, 0x41, 0x24, 0x58, 0x23,
0x84, 0x14, 0x15, 0x61, 0x80, 0xb9, 0x5e, 0xbd,
0xce, 0x32, 0xb4, 0xbe, 0xbc, 0x91, 0x31, 0xd6,
0x19, 0x00, 0x80, 0x8b, 0xfa, 0x00, 0x05, 0x9c
])
class TestConsumer(object):
def __init__(self, face):
# Set up face
self.face = face
self.databaseFilePath = "policy_config/test_consumer.db"
try:
os.remove(self.databaseFilePath)
except OSError:
# no such file
pass
self.groupName = Name("/prefix/READ/a")
# Set up the keyChain.
identityStorage = BasicIdentityStorage()
privateKeyStorage = FilePrivateKeyStorage()
self.keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage),
NoVerifyPolicyManager())
identityName = Name("/ndn/member0")
self.certificateName = self.keyChain.createIdentityAndCertificate(identityName)
self.face.setCommandSigningInfo(self.keyChain, self.certificateName)
consumerKeyName = IdentityCertificate.certificateNameToPublicKeyName(self.certificateName)
consumerCertificate = identityStorage.getCertificate(self.certificateName, True)
self.consumer = Consumer(
face, self.keyChain, self.groupName, consumerKeyName,
Sqlite3ConsumerDb(self.databaseFilePath))
# TODO: Read the private key to decrypt d-key...this may or may not be ideal
base64Content = None
with open(privateKeyStorage.nameTransform(consumerKeyName.toUri(), ".pri")) as keyFile:
base64Content = keyFile.read()
der = Blob(base64.b64decode(base64Content), False)
self.consumer.addDecryptionKey(consumerKeyName, der)
self.memoryContentCache = MemoryContentCache(self.face)
self.memoryContentCache.registerPrefix(identityName, self.onRegisterFailed, self.onDataNotFound)
self.memoryContentCache.add(consumerCertificate)
print "Consumer certificate name: " + self.certificateName.toUri()
return
def onDataNotFound(self, prefix, interest, face, interestFilterId, filter):
print "Data not found for interest: " + interest.getName().toUri()
return
def onRegisterFailed(self, prefix):
print "Prefix registration failed"
return
def consume(self, contentName):
self.consumer.consume(contentName, self.onConsumeComplete, self.onConsumeFailed)
def onConsumeComplete(self, data, result):
print "Consume complete for data name: " + data.getName().toUri()
# Test the length of encrypted data
# dataBlob = data.getContent()
# dataContent = EncryptedContent()
# dataContent.wireDecode(dataBlob)
# encryptedData = dataContent.getPayload()
# print len(encryptedData)
if result.equals(Blob(DATA_CONTENT, False)):
print "Got expected content"
else:
print "Didn't get expected content"
return
def onConsumeFailed(self, code, message):
print "Consume error " + str(code) + ": " + message
if __name__ == "__main__":
print "Start NAC consumer test"
face = Face()
testConsumer = TestConsumer(face)
contentName = Name("/prefix/SAMPLE/a/b/c/20150825T080000")
testConsumer.consume(contentName)
print "Trying to consume: " + contentName.toUri()
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
import unittest as ut
import os, time
from pyndn import Name, Data, Face, Interest, Exclude
from pyndn.util import Blob, MemoryContentCache
from pyndn.encrypt import GroupManager, Sqlite3GroupManagerDb, EncryptedContent
from pyndn.encrypt import Schedule, RepetitiveInterval, DecryptKey, EncryptKey
from pyndn.encrypt.algo import Encryptor, AesAlgorithm, RsaAlgorithm
from pyndn.encrypt.algo import EncryptParams, EncryptAlgorithmType
from pyndn.security import KeyChain, RsaKeyParams
from pyndn.security.identity import IdentityManager
from pyndn.security.identity import MemoryIdentityStorage, MemoryPrivateKeyStorage
from pyndn.security.policy import NoVerifyPolicyManager
DATA_CONTENT = bytearray([
0xcb, 0xe5, 0x6a, 0x80, 0x41, 0x24, 0x58, 0x23,
0x84, 0x14, 0x15, 0x61, 0x80, 0xb9, 0x5e, 0xbd,
0xce, 0x32, 0xb4, 0xbe, 0xbc, 0x91, 0x31, 0xd6,
0x19, 0x00, 0x80, 0x8b, 0xfa, 0x00, 0x05, 0x9c
])
class TestGroupManager(object):
def __init__(self, face):
# Set up face
self.face = face
# Set up the keyChain.
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
self.keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage),
NoVerifyPolicyManager())
identityName = Name("prefix")
self.certificateName = self.keyChain.createIdentityAndCertificate(identityName)
self.keyChain.getIdentityManager().setDefaultIdentity(identityName)
self.face.setCommandSigningInfo(self.keyChain, self.certificateName)
self.dKeyDatabaseFilePath = "policy_config/manager-d-key-test.db"
try:
os.remove(self.dKeyDatabaseFilePath)
except OSError:
# no such file
pass
self.manager = GroupManager(
identityName, Name("a"),
Sqlite3GroupManagerDb(self.dKeyDatabaseFilePath), 2048, 1,
self.keyChain)
self.memoryContentCache = MemoryContentCache(self.face)
self.memoryContentCache.registerPrefix(identityName, self.onRegisterFailed, self.onDataNotFound)
self.generateGroupKeyFlag = False
return
def setManager(self):
schedule1 = Schedule()
interval11 = RepetitiveInterval(
Schedule.fromIsoString("20150825T000000"),
Schedule.fromIsoString("20150827T000000"), 5, 10, 2,
RepetitiveInterval.RepeatUnit.DAY)
interval12 = RepetitiveInterval(
Schedule.fromIsoString("20150825T000000"),
Schedule.fromIsoString("20150827T000000"), 6, 8, 1,
RepetitiveInterval.RepeatUnit.DAY)
interval13 = RepetitiveInterval(
Schedule.fromIsoString("20150827T000000"),
Schedule.fromIsoString("20150827T000000"), 7, 8)
schedule1.addWhiteInterval(interval11)
schedule1.addWhiteInterval(interval12)
schedule1.addBlackInterval(interval13)
self.manager.addSchedule("schedule1", schedule1)
# TODO: for now, we ignore the ksk-timestamp component in this request
memberA = Name("/ndn/member0/KEY/")
interest = Interest(memberA)
interest.setInterestLifetimeMilliseconds(4000)
print "Retrieving member certificate: " + interest.getName().toUri()
self.face.expressInterest(interest, self.onMemberCertificateData, self.onMemberCertificateTimeout)
def onMemberCertificateData(self, interest, data):
print "Member certificate with name retrieved: " + data.getName().toUri() + "; member added to group!"
self.generateGroupKeyFlag = True
self.manager.addMember("schedule1", data)
def onMemberCertificateTimeout(self, interest):
print "Member certificate interest times out: " + interest.getName().toUri()
return
def getAndPublishGroupKeys(self):
timePoint1 = Schedule.fromIsoString("20150825T093000")
result = self.manager.getGroupKey(timePoint1)
# The first is group public key, E-key
# The rest are group private keys encrypted with each member's public key, D-key
for i in range(0, len(result)):
self.memoryContentCache.add(result[i])
print "group getKeys result name: " + str(i) + " " + result[i].getName().toUri()
def onDataNotFound(self, prefix, interest, face, interestFilterId, filter):
print "Data not found for interest: " + interest.getName().toUri()
if interest.getExclude():
print "Interest has exclude: " + interest.getExclude().toUri()
return
def onRegisterFailed(self, prefix):
print "Prefix registration failed"
return
if __name__ == "__main__":
print "Start NAC group manager test"
face = Face()
testGroupManager = TestGroupManager(face)
testGroupManager.setManager()
while True:
face.processEvents()
if testGroupManager.generateGroupKeyFlag:
testGroupManager.getAndPublishGroupKeys()
testGroupManager.generateGroupKeyFlag = False
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
import unittest as ut
import os, time
from pyndn import Name, Data, Face
from pyndn.util import Blob, MemoryContentCache
from pyndn.encrypt import Producer, Schedule, Sqlite3ProducerDb, EncryptedContent
from pyndn.encrypt.algo import Encryptor, AesAlgorithm, RsaAlgorithm
from pyndn.encrypt.algo import EncryptParams, EncryptAlgorithmType
from pyndn.security import KeyChain, RsaKeyParams
from pyndn.security.identity import IdentityManager
from pyndn.security.identity import MemoryIdentityStorage, MemoryPrivateKeyStorage
from pyndn.security.policy import NoVerifyPolicyManager
DATA_CONTENT = bytearray([
0xcb, 0xe5, 0x6a, 0x80, 0x41, 0x24, 0x58, 0x23,
0x84, 0x14, 0x15, 0x61, 0x80, 0xb9, 0x5e, 0xbd,
0xce, 0x32, 0xb4, 0xbe, 0xbc, 0x91, 0x31, 0xd6,
0x19, 0x00, 0x80, 0x8b, 0xfa, 0x00, 0x05, 0x9c
])
class TestProducer(object):
def __init__(self, face):
# Set up face
self.face = face
# Set up the keyChain.
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
self.keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage),
NoVerifyPolicyManager())
identityName = Name("TestProducer")
self.certificateName = self.keyChain.createIdentityAndCertificate(identityName)
self.keyChain.getIdentityManager().setDefaultIdentity(identityName)
self.face.setCommandSigningInfo(self.keyChain, self.certificateName)
self.databaseFilePath = "policy_config/test_producer.db"
try:
os.remove(self.databaseFilePath)
except OSError:
# no such file
pass
self.testDb = Sqlite3ProducerDb(self.databaseFilePath)
prefix = Name("/prefix")
suffix = Name("/a/b/c")
self.producer = Producer(prefix, suffix, self.face, self.keyChain, self.testDb)
self.memoryContentCache = MemoryContentCache(self.face)
self.memoryContentCache.registerPrefix(prefix, self.onRegisterFailed, self.onDataNotFound)
return
def onDataNotFound(self, prefix, interest, face, interestFilterId, filter):
print "Data not found for interest: " + interest.getName().toUri()
return
def onRegisterFailed(self, prefix):
print "Prefix registration failed"
return
def createContentKey(self, timeSlot):
print "Creating content key"
contentKeyName = self.producer.createContentKey(timeSlot, self.onEncryptedKeys)
def produce(self, timeSlot):
emptyData = Data()
self.producer.produce(emptyData, timeSlot, Blob(DATA_CONTENT, False))
producedName = emptyData.getName()
# Test the length of encrypted data
# dataBlob = emptyData.getContent()
# dataContent = EncryptedContent()
# dataContent.wireDecode(dataBlob)
# encryptedData = dataContent.getPayload()
# print len(encryptedData)
self.memoryContentCache.add(emptyData)
print "Produced data with name " + producedName.toUri()
def onEncryptedKeys(self, keys):
print "onEncryptedKeys called"
if not keys:
print "onEncryptedKeys: no keys in callback!"
for i in range(0, len(keys)):
print "onEncryptedKeys: produced encrypted key " + keys[i].getName().toUri()
self.memoryContentCache.add(keys[i])
return
if __name__ == "__main__":
print "Start NAC producer test"
face = Face()
testProducer = TestProducer(face)
testTime1 = Schedule.fromIsoString("20150825T080000")
testProducer.createContentKey(testTime1)
testProducer.produce(testTime1)
print "Produced"
# TODO: getting the encrypted C-key to the group manager
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment