Created
December 3, 2018 15:28
-
-
Save AmatsuZero/987d1af42119721ef8169dc9185c5ed2 to your computer and use it in GitHub Desktop.
RTMP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// EHStreamSession.swift | |
// EShopHelper | |
// | |
// Created by Jiang,Zhenhua on 2018/12/3. | |
// Copyright © 2018 Daubert. All rights reserved. | |
// | |
import Foundation | |
@objc protocol EHStreamSessionDelegate: NSObjectProtocol { | |
@objc optional func didChange(session: EHStreamSession, status: Stream.Event) | |
} | |
@objcMembers class EHStreamSession: NSObject { | |
private(set) var streamStatus: Stream.Event? | |
weak var delegate: EHStreamSessionDelegate? | |
fileprivate var inputStream: InputStream? | |
fileprivate var outputStream: OutputStream? | |
func connect(to server: String, port p: UInt32) { | |
let unavailable: Stream.Event = [.errorOccurred, .endEncountered] | |
if let status = streamStatus, !unavailable.contains(status) { | |
close() | |
} | |
var readStream: Unmanaged<CFReadStream>? | |
var writeStream: Unmanaged<CFWriteStream>? | |
let port = p <= 0 ? 1935 : p | |
CFStreamCreatePairWithSocketToHost(nil, server as CFString, port, &readStream, &writeStream) | |
guard let input = readStream?.takeRetainedValue(), | |
let output = writeStream?.takeRetainedValue() else { | |
return | |
} | |
inputStream = input | |
inputStream?.delegate = self | |
outputStream = output | |
outputStream?.delegate = self | |
outputStream?.schedule(in: .main, forMode: .defaultRunLoopMode) | |
inputStream?.schedule(in: .main, forMode: .defaultRunLoopMode) | |
inputStream?.open() | |
outputStream?.open() | |
} | |
func disconnect() { | |
close() | |
} | |
func read() -> Data? { | |
var buffer = [UInt8](repeating: 0, count: 48) | |
var data: Data? | |
if let len = inputStream?.read(&buffer, maxLength: MemoryLayout.size(ofValue: buffer)), | |
len < MemoryLayout.size(ofValue: buffer), | |
streamStatus?.contains(.hasBytesAvailable) ?? false { | |
streamStatus?.remove(.hasBytesAvailable) | |
data = Data(bytes: buffer) | |
} | |
return data | |
} | |
func wrtie(data: Data) -> Int { | |
guard !data.isEmpty else { | |
return 0 | |
} | |
var ret = 0 | |
if let output = outputStream, output.hasSpaceAvailable { | |
var src = [UInt8]() | |
data.copyBytes(to: &src, count: data.count) | |
ret = output.write(&src, maxLength: data.count) | |
} | |
if ret > 0, streamStatus?.contains(.hasSpaceAvailable) ?? false { | |
streamStatus?.remove(.hasSpaceAvailable) | |
} | |
return ret | |
} | |
private func close() { | |
inputStream?.close() | |
outputStream?.close() | |
inputStream?.remove(from: .current, forMode: .defaultRunLoopMode) | |
inputStream?.remove(from: .current, forMode: .defaultRunLoopMode) | |
outputStream = nil | |
streamStatus = nil | |
inputStream = nil | |
inputStream?.delegate = nil | |
outputStream?.delegate = nil | |
} | |
} | |
extension EHStreamSession: StreamDelegate { | |
func stream(_ aStream: Stream, handle eventCode: Stream.Event) { | |
switch eventCode { | |
case .openCompleted where aStream == inputStream: | |
streamStatus = eventCode | |
print("连接成功") | |
case .hasBytesAvailable: | |
streamStatus?.insert(eventCode) | |
print("有字节可读") | |
case .hasSpaceAvailable: | |
streamStatus?.insert(eventCode) | |
print("可以发送字节") | |
case .errorOccurred: | |
streamStatus = eventCode | |
print("链接出现错误") | |
case .endEncountered: | |
streamStatus = eventCode | |
print("链接结束") | |
default: | |
return | |
} | |
if let delegate = self.delegate, | |
delegate.responds(to: #selector(EHStreamSessionDelegate.didChange(session:status:))), | |
let status = self.streamStatus { | |
delegate.didChange!(session: self, status: status) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment