Skip to content

Instantly share code, notes, and snippets.

@AmatsuZero
Created December 3, 2018 15:28
Show Gist options
  • Save AmatsuZero/987d1af42119721ef8169dc9185c5ed2 to your computer and use it in GitHub Desktop.
Save AmatsuZero/987d1af42119721ef8169dc9185c5ed2 to your computer and use it in GitHub Desktop.
RTMP
//
// EHStreamSession.swift
// EShopHelper
//
// Created by Jiang,Zhenhua on 2018/12/3.
// Copyright © 2018 Daubert. All rights reserved.
//
import Foundation
@objc protocol EHStreamSessionDelegate: NSObjectProtocol {
@objc optional func didChange(session: EHStreamSession, status: Stream.Event)
}
@objcMembers class EHStreamSession: NSObject {
private(set) var streamStatus: Stream.Event?
weak var delegate: EHStreamSessionDelegate?
fileprivate var inputStream: InputStream?
fileprivate var outputStream: OutputStream?
func connect(to server: String, port p: UInt32) {
let unavailable: Stream.Event = [.errorOccurred, .endEncountered]
if let status = streamStatus, !unavailable.contains(status) {
close()
}
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
let port = p <= 0 ? 1935 : p
CFStreamCreatePairWithSocketToHost(nil, server as CFString, port, &readStream, &writeStream)
guard let input = readStream?.takeRetainedValue(),
let output = writeStream?.takeRetainedValue() else {
return
}
inputStream = input
inputStream?.delegate = self
outputStream = output
outputStream?.delegate = self
outputStream?.schedule(in: .main, forMode: .defaultRunLoopMode)
inputStream?.schedule(in: .main, forMode: .defaultRunLoopMode)
inputStream?.open()
outputStream?.open()
}
func disconnect() {
close()
}
func read() -> Data? {
var buffer = [UInt8](repeating: 0, count: 48)
var data: Data?
if let len = inputStream?.read(&buffer, maxLength: MemoryLayout.size(ofValue: buffer)),
len < MemoryLayout.size(ofValue: buffer),
streamStatus?.contains(.hasBytesAvailable) ?? false {
streamStatus?.remove(.hasBytesAvailable)
data = Data(bytes: buffer)
}
return data
}
func wrtie(data: Data) -> Int {
guard !data.isEmpty else {
return 0
}
var ret = 0
if let output = outputStream, output.hasSpaceAvailable {
var src = [UInt8]()
data.copyBytes(to: &src, count: data.count)
ret = output.write(&src, maxLength: data.count)
}
if ret > 0, streamStatus?.contains(.hasSpaceAvailable) ?? false {
streamStatus?.remove(.hasSpaceAvailable)
}
return ret
}
private func close() {
inputStream?.close()
outputStream?.close()
inputStream?.remove(from: .current, forMode: .defaultRunLoopMode)
inputStream?.remove(from: .current, forMode: .defaultRunLoopMode)
outputStream = nil
streamStatus = nil
inputStream = nil
inputStream?.delegate = nil
outputStream?.delegate = nil
}
}
extension EHStreamSession: StreamDelegate {
func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
switch eventCode {
case .openCompleted where aStream == inputStream:
streamStatus = eventCode
print("连接成功")
case .hasBytesAvailable:
streamStatus?.insert(eventCode)
print("有字节可读")
case .hasSpaceAvailable:
streamStatus?.insert(eventCode)
print("可以发送字节")
case .errorOccurred:
streamStatus = eventCode
print("链接出现错误")
case .endEncountered:
streamStatus = eventCode
print("链接结束")
default:
return
}
if let delegate = self.delegate,
delegate.responds(to: #selector(EHStreamSessionDelegate.didChange(session:status:))),
let status = self.streamStatus {
delegate.didChange!(session: self, status: status)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment