|
package remoterun |
|
|
|
import cats.{Applicative, FlatMap, Functor, Monad} |
|
import cats.effect.IO |
|
import net.schmizz.sshj.SSHClient |
|
import net.schmizz.sshj.transport.verification.{FingerprintVerifier, HostKeyVerifier, PromiscuousVerifier} |
|
|
|
import java.net.{Inet4Address, InetAddress, InetSocketAddress} |
|
import net.schmizz.sshj.SSHClient |
|
import net.schmizz.sshj.common.{IOUtils, LoggerFactory, StreamCopier} |
|
import net.schmizz.sshj.connection.channel.direct.Session |
|
import net.schmizz.sshj.connection.channel.direct.Session.Command |
|
import net.schmizz.sshj.sftp.Response.StatusCode |
|
import net.schmizz.sshj.sftp.{FileAttributes, RemoteResourceInfo, SFTPClient, SFTPException} |
|
import net.schmizz.sshj.xfer.scp.SCPFileTransfer |
|
|
|
import java.io.{Console, File, IOException} |
|
import java.util.concurrent.TimeUnit |
|
import scala.util.control.NonFatal |
|
import tofu.syntax.monadic._ |
|
|
|
import scala.collection.mutable |
|
import scala.language.higherKinds |
|
|
|
object Run extends App { |
|
|
|
System.setProperty("scala.time","true") |
|
|
|
|
|
pack("projectname1") |
|
pack("projectname2") |
|
|
|
|
|
val pkey = "/absolute/path/to/private/key" |
|
|
|
val ssh = new SSHClient() |
|
ssh.addHostKeyVerifier(new PromiscuousVerifier) |
|
|
|
|
|
val host = "target ip or hostname" |
|
|
|
ssh.connect(host) |
|
|
|
println("Connected: "+ssh.isConnected) |
|
ssh.authPublickey("root", pkey) |
|
println("Authorized: "+ssh.isAuthenticated) |
|
|
|
|
|
val scp = ssh.newSCPFileTransfer() |
|
val started_at = System.currentTimeMillis() |
|
|
|
val remote_dir = "/data/remote-run" |
|
|
|
val program = sync[IO]( |
|
"pack/lib", |
|
Actions.local("tests/target"), |
|
Actions.remote( |
|
remote_dir, |
|
Sftp.io(ssh.newSFTPClient()), |
|
ssh.newSCPFileTransfer() |
|
) |
|
)( |
|
pretest( |
|
s"${remote_dir}/.remoterun", |
|
s"${remote_dir}/pack/lib", |
|
Sftp.io(ssh.newSFTPClient()) |
|
) |
|
) |
|
|
|
println("------- now unsafeRunSync -------") |
|
program.unsafeRunSync() |
|
|
|
println(s"Done sync in ${(System.currentTimeMillis() - started_at)/1000}s") |
|
|
|
val cp = new File("tests/target/pack/lib") |
|
.list() |
|
.filter(_.endsWith("jar")) |
|
.sorted |
|
.map("/data/remote-run/pack/lib/"+_) |
|
.mkString(":") |
|
|
|
val javacmd = s"""java -Dfile.encoding=UTF-8 -classpath "${cp}" ${args.mkString(" ")}""" |
|
|
|
val dockercmd = s"""docker run --rm -t -v /data:/data openjdk:11.0.7-jre ${javacmd}""" |
|
|
|
execLive(dockercmd) |
|
|
|
println("Done remote exec!") |
|
// Thread.sleep(10000) |
|
println("Totally done") |
|
|
|
|
|
|
|
|
|
|
|
/*** |
|
* |
|
* |
|
* |
|
* |
|
* |
|
************************************************************************************************ |
|
* |
|
* |
|
* |
|
* |
|
*/ |
|
|
|
def pretest[F[_] : FlatMap](markfile:String, remote_pack:String, sftp:Sftp[F]):F[Unit] = for { |
|
b1 <- sftp.exists(markfile) |
|
_ = require(b1, s"${markfile} must exists") |
|
b2 <- sftp.exists(remote_pack) |
|
_ = if(b2) () else sftp.mkdir(remote_pack) |
|
_ = println("Done pretest!") |
|
} yield () |
|
|
|
|
|
|
|
|
|
def sync[F[_]: Monad]( |
|
pack:String, |
|
local:Actions[F], |
|
remote:RemoteActions[F] |
|
)(pretest:F[Unit]):F[Unit] = { |
|
for { |
|
_ <- pretest |
|
local_list <- local.list_files(pack) |
|
local_list_prefixed = local_list.map(pack + "/" + _) |
|
local_md5 <- local.md5(local_list_prefixed) |
|
_ = println("??---") |
|
remote_list <- remote.list_files(pack) |
|
remote_md5 <- (if(remote_list.size == 0) Map.empty[String,String].pure[F] else remote.md5(remote_list.map(pack + "/" + _)).map(_.toMap)) |
|
} yield { |
|
|
|
println("Files listed. Syncing...") |
|
|
|
val skipped = mutable.Buffer.empty[String] |
|
|
|
local_md5.toList.sortBy(_._1).foreach { case (p,md5) => |
|
remote_md5.get(p).exists(_ == md5) match { |
|
case true => //skip |
|
// println(s"--Skip: ${p}") |
|
skipped.append(p) |
|
case false => |
|
println(s"Uploading: ${p}") |
|
remote.upload(local.abs(p), p).unsafeRunSync() |
|
} |
|
} |
|
|
|
println(s"Done uploading, skipped=${skipped.size}") |
|
} |
|
} |
|
|
|
|
|
def pack(projectName:String,version:String = "0.0.0"): Unit ={ |
|
|
|
import scala.sys.process._ |
|
|
|
val stdLogger = ProcessLogger(s => System.out.println(s), s => System.err.println(s)) |
|
|
|
val jarname = s"${projectName}_2.12-${version}.jar" |
|
|
|
val command = s"""zip -r ../${jarname} .""" |
|
|
|
println(command) |
|
scala.sys.process.Process(command,new File(s"${projectName}/target/scala-2.12/classes")).!!(stdLogger) |
|
|
|
|
|
val command2 = s"""mv ${projectName}/target/scala-2.12/$jarname tests/target/pack/lib/""" |
|
|
|
scala.sys.process.Process(command2).!!(stdLogger) |
|
} |
|
|
|
|
|
case class Response(out:String, err:String) |
|
case class Error(code:Int, message:Option[String], coredumped:Option[Boolean], response: Response) |
|
|
|
def exec(command:String):Either[Error, Response] = { |
|
val session = ssh.startSession() |
|
val c = session.exec(command) |
|
|
|
val stdout = IOUtils.readFully(c.getInputStream).toString |
|
val stderr = IOUtils.readFully(c.getErrorStream).toString |
|
|
|
val response = Response(stdout, stderr) |
|
|
|
c.getExitStatus.toInt match { |
|
case 0 => Right(response) |
|
case code => |
|
Left( |
|
Error( |
|
code, |
|
Option(c.getExitErrorMessage), |
|
Option(c.getExitWasCoreDumped), |
|
response |
|
) |
|
) |
|
} |
|
|
|
|
|
} |
|
|
|
def execLive(command:String):Unit = { |
|
|
|
println(s"[EXEC] ${command}") |
|
|
|
val session = ssh.startSession() |
|
val c = session.exec(command) |
|
|
|
val copier = new StreamCopier(c.getInputStream, System.out, LoggerFactory.DEFAULT) |
|
|
|
val result = copier.spawn("spawnout") |
|
|
|
val err = new StreamCopier(c.getErrorStream, System.err, LoggerFactory.DEFAULT) |
|
val result2 = err.spawn("spawnerr") |
|
|
|
|
|
println("SPAWNOUT:\n"+result) |
|
println("SPAWNERR\n"+result2) |
|
} |
|
|
|
implicit class SshOps(private val ssh:SSHClient) extends AnyVal { |
|
def md5(path:String):Either[Error,String] = |
|
exec(s"md5sum ${path} | awk '{ print $$1 }'") |
|
.right.map(_.out.trim) |
|
} |
|
|
|
|
|
trait Actions[F[_]] { |
|
|
|
def abs(path:String):String |
|
|
|
def list_files(path:String):F[Array[String]] |
|
def md5(items:Traversable[String]):F[Traversable[(String, String)]] |
|
} |
|
|
|
trait RemoteActions[F[_]] extends Actions[F]{ |
|
def upload(localPath:String, remotePath:String):IO[Unit] |
|
} |
|
|
|
object Actions { |
|
|
|
def local(scope0:String):Actions[IO] = { |
|
|
|
val scope = new File(scope0) |
|
|
|
new Actions[IO] { |
|
|
|
def abs(path:String):String = { |
|
require(path.startsWith("/") == false) |
|
s"${scope0}/${path}" |
|
} |
|
|
|
def list_files(path: String): IO[Array[String]] = IO { |
|
scope.toPath.resolve(path) |
|
.toFile |
|
.listFiles() |
|
.filter(_.isDirectory == false) |
|
.map(_.getName) |
|
} |
|
|
|
def md5(items: Traversable[String]): IO[Traversable[(String, String)]] = IO { |
|
val command = s"md5 -r ${items.mkString(" ")}" |
|
val out = scala.sys.process.Process(command,scope).!! |
|
|
|
out |
|
.lines |
|
.map(_.split(' ')) |
|
.map { |
|
case Array(a, b) => (b, a) |
|
} |
|
.toVector |
|
} |
|
} |
|
} |
|
|
|
def remote(scope:String, sftp: Sftp[IO], scp:SCPFileTransfer):RemoteActions[IO] = new RemoteActions[IO] { |
|
|
|
def abs(path:String):String = { |
|
require(path.startsWith("/") == false) |
|
s"${scope}/${path}" |
|
} |
|
|
|
def list_files(path: String): IO[Array[String]] = for { |
|
list <- sftp.ls(scope + "/"+ path) |
|
} yield { |
|
list.toArray.map(_.getName) |
|
} |
|
|
|
def md5(items: Traversable[String]): IO[Traversable[(String, String)]] = IO { |
|
|
|
exec(s"md5sum ${items.map(scope + "/" + _).toList.mkString(" ")}") |
|
.map( |
|
_.out.lines.toVector |
|
.map(_.split("\\s+").toArray) |
|
.map { |
|
case Array(a,b) => (b.drop(scope.length + 1),a) |
|
} |
|
) match { |
|
case Left(e) => throw new Exception(e.toString) |
|
case Right(x) => x |
|
} |
|
} |
|
|
|
def upload(localPath:String, remotePath:String):IO[Unit] = IO { |
|
scp.upload(localPath, s"${scope}/${remotePath}") |
|
} |
|
|
|
} |
|
} |
|
|
|
|
|
trait Sftp[F[_]]{ |
|
def exists(path:String):F[Boolean] |
|
def stat(path:String):F[FileAttributes] |
|
|
|
def mkdir(path:String):F[Unit] |
|
|
|
def ls(path:String):F[Traversable[RemoteResourceInfo]] |
|
} |
|
|
|
object Sftp { |
|
|
|
import scala.collection.JavaConverters._ |
|
|
|
def io(client:SFTPClient):Sftp[IO] = new Sftp[IO] { |
|
def exists(path:String):IO[Boolean] = |
|
stat(path) |
|
.map(_ => true) |
|
.redeem({ |
|
case e:SFTPException if e.getStatusCode == StatusCode.NO_SUCH_FILE => false |
|
}, x => x) |
|
|
|
def stat(path:String):IO[FileAttributes] = |
|
IO(client.stat(path)) |
|
|
|
def mkdir(path:String):IO[Unit] = IO( client.mkdir(path) ) |
|
|
|
def ls(path:String):IO[Traversable[RemoteResourceInfo]] = IO( client.ls(path).asScala) |
|
|
|
|
|
} |
|
} |
|
} |