Skip to content

Instantly share code, notes, and snippets.

@REDNBLACK
Last active September 25, 2021 21:37
Show Gist options
  • Save REDNBLACK/014c7c56cd78b2a92ea4bf507621fad3 to your computer and use it in GitHub Desktop.
Save REDNBLACK/014c7c56cd78b2a92ea4bf507621fad3 to your computer and use it in GitHub Desktop.
Generate iOS *.mobileconfig for ProtonVPN Free Servers
import scala.util._
import scala.util.chaining._
import scala.collection.immutable.SortedMap
import cats._
import cats.syntax.all._
object Generator {
object Domain {
import java.nio.charset.StandardCharsets.UTF_8
import scala.xml.Elem
case class VPNProvider(name: String, domain: Vector[String]) {
import java.util.UUID
def org(reverse: Boolean, prefixes: String*): String =
(prefixes ++ domain).pipe(xs => if (reverse) xs.reverseIterator else xs).mkString(".")
def uuid(prefixes: String*): UUID = UUID.nameUUIDFromBytes(org(false, prefixes: _*).getBytes(UTF_8))
}
opaque type VPNLang = String
object VPNLang {
def apply(name: String): VPNLang = name
given Show[Option[VPNLang]] = _.fold(Monoid.empty[String])(_.show)
}
opaque type VPNNumber = Int
object VPNNumber {
final val Prefix = "0"
def unapply(str: String): Option[VPNNumber] = str.stripPrefix(Prefix).toIntOption
given Show[VPNNumber] = {
case n if n < 10 => s"$Prefix$n"
case n => s"$n"
}
}
enum VPNTier { case Free, Basic, Plus }
object VPNTier {
import io.circe.Decoder
import CommandLineParser.FromString
def orEmpty(tiers: Seq[VPNTier]): Set[VPNTier] = if (tiers.nonEmpty) tiers.toSet else Set(VPNTier.Free)
def fromOrdinalEither(ordinal: Int): Either[String, VPNTier] =
VPNTier.values.find(_.ordinal == ordinal).toRight(s"No such ${classOf[VPNTier].getSimpleName} value: $ordinal")
given Order[VPNTier] = Order.by(_.ordinal)
given Show[VPNTier] = {
case Free => s"$Free"
case _ => Monoid.empty[String]
}
given Decoder[VPNTier] = Decoder[Int].emap(fromOrdinalEither)
given FromString[VPNTier] = _.toInt.pipe(fromOrdinalEither).fold(e => throw IllegalArgumentException(e), identity)
}
sealed trait VPNEntity
object VPNEntity {
import cats.instances.order._
case class Single(tier: VPNTier, num: VPNNumber, lang1: VPNLang, lang2: Option[VPNLang], postfix: Option[String]) extends VPNEntity {
import VPNLang.given
import VPNNumber.given
val (name, host) = Vector(lang1.show, lang2.show, tier.show) pipe { prefix =>
def fmt(it: Iterable[String]*)(up: Boolean) =
Iterator.concat(it: _*).filter(_.nonEmpty).mkString("-").pipe(s => if (up) s.toUpperCase else s.toLowerCase)
show"${fmt(prefix, postfix)(up = true)}#$num" -> fmt(prefix :+ num.show, postfix)(up = false)
}
}
class Many(underlying: Vector[Single], grouped: SortedMap[VPNTier, Int]) extends VPNEntity with Iterable[Single] {
def tiers: String = grouped.keys.mkString(" + ")
def countByTier: String = grouped.view.map { case (t, n) => s"${n}x $t" }.mkString(" + ")
def iterator: Iterator[Single] = underlying.iterator
}
def apply(entities: Vector[Single]): Many = entities.sorted.pipe { sorted =>
Many(sorted, sorted.groupMapReduce(_.tier)(_ => 1)(_ + _).to(SortedMap))
}
def apply(
tier: VPNTier = VPNTier.Free,
lang1: VPNLang,
lang2: Option[VPNLang] = None,
num: VPNNumber,
postfix: Option[String] = None
): Single = Single(tier, num, lang1, lang2, postfix)
given Order[Single] = Order.fromOrdering(
Ordering
.by[Single, VPNTier](_.tier)
.orElseBy(it => (it.lang1, it.lang2))
.orElseBy(_.num)
.orElseBy(_.postfix)
)
}
opaque type VPNSchema = Elem
object VPNSchema {
def apply(e: Elem): VPNSchema = e
given Show[VPNSchema] = schema => {
import java.io.StringWriter
import java.lang.System.{lineSeparator => EOL}
import scala.xml.PrettyPrinter
import scala.xml.dtd.{DocType, PublicID}
StringBuilder()
.append(s"""<?xml version="1.0" encoding="$UTF_8"?>""")
.append(EOL)
.append(DocType("plist", PublicID("-//Apple//DTD PLIST 1.0//EN", "http://www.apple.com/DTDs/PropertyList-1.0.dtd"), Nil))
.append(EOL)
.tap(PrettyPrinter(width = 120, step = 2).format(schema, _))
.result
}
}
case class Credentials(usr: String, pwd: String, cert: String)
}
object Parse {
import io.circe.{Json, Decoder}
import io.circe.jawn.JawnParser
import Domain._
def apply(bytes: Array[Byte], tiers: Set[VPNTier]): Either[String, Vector[VPNEntity.Single]] = {
val N: VPNNumber.type = VPNNumber
val L: VPNLang.type = VPNLang
extension (json: Json) def field[A: Decoder](field: String): Either[String, A] =
json.hcursor.get[A](field).leftMap(_.show)
JawnParser(false)
.parseByteArray(bytes)
.leftMap(_.show)
.flatMap(_.field[Vector[Json]]("LogicalServers"))
.flatMap(_.traverseFilter { server =>
(server.field[VPNTier]("Tier"), server.field[String]("Name")).mapN { (tier, name) =>
if (!tiers(tier)) None
else name match {
case s"$lang-FREE#${N(num)}" =>
VPNEntity(lang1 = L(lang), num = num).some
case s"$lang1-$lang2#${N(num)}-$postfix" =>
VPNEntity(tier, L(lang1), L(lang2).some, num, postfix.some).some
case s"$lang1-$lang2#${N(num)}" =>
VPNEntity(tier, L(lang1), L(lang2).some, num).some
case s"$lang#${N(num)}-$postfix" =>
VPNEntity(tier, L(lang), num = num, postfix = postfix.some).some
case s"$lang#${N(num)}" =>
VPNEntity(tier, L(lang), num = num).some
}
}
})
}
}
object Download {
import java.net.URI
import java.net.http.{HttpRequest, HttpClient, HttpResponse}
import java.time.Duration
import java.nio.file.{Paths, Files}
import java.util.Base64.{getEncoder => B64}
import Domain._
def cert(prov: VPNProvider, url: String): Either[String, String] = {
// download(url, s"${prov.name} Root CA")(B64.encodeToString)
Right(B64.encodeToString(Files.readAllBytes(Paths.get("/Users/rb/Documents/projects/Scala/VPNConf/data/ProtonVPN_Root_CA.der"))))
}
def registry(prov: VPNProvider, url: String, tiers: Set[VPNTier]): Either[String, Vector[VPNEntity.Single]] = {
// download(url, s"${prov.name} Registry")(Parse(_, tiers).fold(sys.error, identity))
Parse(Files.readAllBytes(Paths.get("/Users/rb/Documents/projects/Scala/VPNConf/data/VPNs.json")), tiers)
}
def download[A](url: String, name: String)(fn: Array[Byte] => A): Either[String, A] =
(for {
client <- Try(HttpClient.newBuilder.connectTimeout(Duration.ofSeconds(20)).build)
res <- Try(client.send(HttpRequest.newBuilder(URI(url)).build, HttpResponse.BodyHandlers.ofByteArray()))
decoded <- Try(fn(res.body))
} yield decoded)
.fold(e => s"Failed to download $name: $e".asLeft, _.asRight)
}
object Make {
import Domain._
def vpn(prov: VPNProvider, vpn: VPNEntity.Single, cred: Credentials) =
<dict>
<key>PayloadIdentifier</key>
<string>{prov.org(reverse = true, vpn.host, "vpn")}</string>
<key>PayloadUUID</key>
<string>{prov.uuid(vpn.host, "vpn")}</string>
<key>PayloadType</key>
<string>com.apple.vpn.managed</string>
<key>PayloadVersion</key>
<integer>1</integer>
<key>PayloadDisplayName</key>
<string>{prov.name} {vpn.name}</string>
<key>UserDefinedName</key>
<string>{prov.name} {vpn.name}</string>
<key>VPNType</key>
<string>IKEv2</string>
<key>IKEv2</key>
<dict>
<key>RemoteAddress</key>
<string>{prov.org(reverse = false, vpn.host)}</string>
<key>RemoteIdentifier</key>
<string>{prov.org(reverse = false, vpn.host)}</string>
<key>LocalIdentifier</key>
<string>{cred.usr}</string>
<key>ServerCertificateIssuerCommonName</key>
<string>{prov.name} Root CA</string>
<key>TLSMinimumVersion</key>
<string>1.2</string>
<key>EnablePFS</key>
<integer>1</integer>
<key>DisableRedirect</key>
<true/>
<key>OnDemandEnabled</key>
<integer>0</integer>
<key>OnDemandRules</key>
<array>
<dict>
<key>Action</key>
<string>Connect</string>
</dict>
</array>
<key>AuthenticationMethod</key>
<string>Certificate</string>
<key>ExtendedAuthEnabled</key>
<integer>1</integer>
<key>AuthName</key>
<string>{cred.usr}</string>
<key>AuthPassword</key>
<string>{cred.pwd}</string>
</dict>
</dict>
def cert(prov: VPNProvider, cred: Credentials) =
<dict>
<key>PayloadIdentifier</key>
<string>{prov.org(reverse = true, "ca")}</string>
<key>PayloadUUID</key>
<string>{prov.uuid("ca")}</string>
<key>PayloadType</key>
<string>com.apple.security.root</string>
<key>PayloadVersion</key>
<integer>1</integer>
<key>PayloadContent</key>
<data>{cred.cert}</data>
</dict>
def profile(prov: VPNProvider, entities: VPNEntity.Many, cred: Credentials) = VPNSchema {
<plist version="1.0">
<dict>
<key>PayloadIdentifier</key>
<string>{prov.org(reverse = true, "profile")}</string>
<key>PayloadUUID</key>
<string>{prov.uuid("profile")}</string>
<key>PayloadType</key>
<string>Configuration</string>
<key>PayloadVersion</key>
<integer>1</integer>
<key>PayloadDisplayName</key>
<string>{prov.name} {entities.tiers}</string>
<key>PayloadDescription</key>
<string>This profile installs {entities.countByTier} VPN servers from {prov.name} using native IKEv2.</string>
<key>PayloadContent</key>
<array>
{cert(prov, cred)}
{entities.map { x => println(x.name); x }.flatMap(vpn(prov, _, cred))}
</array>
</dict>
</plist>
}
}
@main def main(user: String, password: String, tiers: Domain.VPNTier*): Unit = {
import Domain._
import VPNSchema.given
val provider = VPNProvider(
name = "ProtonVPN",
domain = Vector("protonvpn", "com")
)
(for {
cert <- Download.cert(provider, "https://protonvpn.com/download/ProtonVPN_ike_root.der")
reg <- Download.registry(provider, "https://api.protonmail.ch/vpn/logicals", VPNTier.orEmpty(tiers))
} yield Make.profile(
prov = provider,
entities = VPNEntity(reg),
cred = Credentials(usr = user, pwd = password, cert = cert)
))
.fold(sys.error, _.show.tap(println))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment