fix proxy seek buffering

This commit is contained in:
dirtydishes 2026-05-25 17:55:40 -04:00
parent bccae25937
commit 6b2e198846
2 changed files with 184 additions and 16 deletions

View file

@ -67,6 +67,7 @@ final class CachedRangeStore {
private let directory: URL
private let byteBudget: Int64
private let fileManager: FileManager
private let lock = NSLock()
private var chunks: [Chunk] = []
init(sessionID: String, byteBudget: Int64, fileManager: FileManager = .default) {
@ -79,6 +80,8 @@ final class CachedRangeStore {
}
func lookup(range: HTTPRange, maximumLength: Int64) -> Lookup? {
lock.lock()
defer { lock.unlock() }
let requestedEnd = range.end ?? range.start + maximumLength - 1
var cursor = range.start
var data = Data()
@ -115,6 +118,8 @@ final class CachedRangeStore {
}
func store(data: Data, start: Int64) {
lock.lock()
defer { lock.unlock() }
guard !data.isEmpty else {
return
}
@ -133,6 +138,8 @@ final class CachedRangeStore {
}
func evictKeepingBytesNear(offset: Int64) {
lock.lock()
defer { lock.unlock() }
let lowerBound = max(0, offset - byteBudget)
let removed = chunks.filter { $0.end < lowerBound }
chunks.removeAll { $0.end < lowerBound }
@ -146,6 +153,8 @@ final class CachedRangeStore {
}
func removeAll() {
lock.lock()
defer { lock.unlock() }
try? fileManager.removeItem(at: directory)
chunks.removeAll()
}
@ -200,11 +209,12 @@ final class NativeStreamCacheProxy {
private let prefetchLength: Int64
private var listener: NWListener?
private let queue = DispatchQueue(label: "dreamio.native-stream-cache-proxy")
private let workQueue = DispatchQueue(label: "dreamio.native-stream-cache-proxy.work", attributes: .concurrent)
init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 8 * 1024 * 1024) {
init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 1024 * 1024) {
self.session = session
self.fetchLength = fetchLength
prefetchLength = fetchLength
prefetchLength = 4 * fetchLength
let budget = byteBudget ?? max(30 * 1024 * 1024, (session.estimatedBitrate ?? 0) * 30 / 8)
store = CachedRangeStore(sessionID: session.id, byteBudget: budget)
}
@ -260,12 +270,26 @@ final class NativeStreamCacheProxy {
connection.cancel()
return
}
let range = HTTPRange.parse(request.headers["range"])
self.respond(to: range, on: connection)
self.workQueue.async {
self.respond(to: request, on: connection)
}
}
}
private func respond(to requestedRange: HTTPRange?, on connection: NWConnection) {
private func respond(to request: HTTPRequest, on connection: NWConnection) {
guard request.path.hasPrefix("/stream/") else {
sendStatus(404, on: connection)
return
}
if request.method == "HEAD" {
respondToHead(on: connection)
return
}
guard request.method == "GET" else {
sendStatus(405, on: connection)
return
}
let requestedRange = HTTPRange.parse(request.headers["range"])
let range = requestedRange ?? HTTPRange(start: 0, end: fetchLength - 1)
let maximumLength = range.length ?? fetchLength
if let lookup = store.lookup(range: range, maximumLength: maximumLength), lookup.isComplete {
@ -292,8 +316,8 @@ final class NativeStreamCacheProxy {
if responseStatus == 206 {
store.store(data: response.data, start: range.start)
store.evictKeepingBytesNear(offset: range.start)
let contentType = response.headers["Content-Type"] as? String
let totalLength = totalLength(from: response.headers["Content-Range"] as? String)
let contentType = headerValue(response.headers, named: "Content-Type")
let totalLength = totalLength(from: headerValue(response.headers, named: "Content-Range"))
send(data: response.data, statusCode: 206, rangeStart: range.start, totalLength: totalLength, contentType: contentType, on: connection)
prefetch(after: range.start + Int64(response.data.count))
} else {
@ -304,6 +328,31 @@ final class NativeStreamCacheProxy {
}
}
private func respondToHead(on connection: NWConnection) {
guard let response = fetchHead() ?? fetch(range: HTTPRange(start: 0, end: 0)) else {
sendStatus(502, on: connection)
return
}
var headers = [
"Accept-Ranges": response.statusCode == 206 ? "bytes" : headerValue(response.headers, named: "Accept-Ranges") ?? "bytes",
"Connection": "close"
]
if let contentType = headerValue(response.headers, named: "Content-Type") {
headers["Content-Type"] = contentType
}
if let length = headerValue(response.headers, named: "Content-Length") {
headers["Content-Length"] = length
} else if let total = totalLength(from: headerValue(response.headers, named: "Content-Range")) {
headers["Content-Length"] = "\(total)"
} else {
headers["Content-Length"] = "0"
}
#if DEBUG
print("[DreamioStreamProxy] head status=\(response.statusCode) length=\(headers["Content-Length"] ?? "unknown")")
#endif
send(statusCode: 200, headers: headers, body: Data(), on: connection)
}
private func prefetch(after offset: Int64) {
let range = HTTPRange(start: offset, end: offset + prefetchLength - 1)
queue.async { [weak self] in
@ -318,11 +367,21 @@ final class NativeStreamCacheProxy {
}
private func fetch(range: HTTPRange) -> UpstreamResponse? {
fetch(request: upstreamRequest(for: range))
}
private func fetchHead() -> UpstreamResponse? {
var request = upstreamRequest(for: nil)
request.httpMethod = "HEAD"
return fetch(request: request)
}
private func fetch(request: URLRequest) -> UpstreamResponse? {
let semaphore = DispatchSemaphore(value: 0)
var result: UpstreamResponse?
URLSession.shared.dataTask(with: upstreamRequest(for: range)) { data, response, _ in
if let http = response as? HTTPURLResponse, let data {
result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data)
URLSession.shared.dataTask(with: request) { data, response, _ in
if let http = response as? HTTPURLResponse {
result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data ?? Data())
}
semaphore.signal()
}.resume()
@ -350,7 +409,7 @@ final class NativeStreamCacheProxy {
"Accept-Ranges": "none",
"Connection": "close"
]
if let contentType = upstreamHeaders["Content-Type"] as? String {
if let contentType = headerValue(upstreamHeaders, named: "Content-Type") {
headers["Content-Type"] = contentType
}
send(statusCode: statusCode, headers: headers, body: data, on: connection)
@ -368,6 +427,12 @@ final class NativeStreamCacheProxy {
connection.send(content: response, completion: .contentProcessed { _ in connection.cancel() })
}
private func headerValue(_ headers: [AnyHashable: Any], named name: String) -> String? {
headers.first { key, _ in
String(describing: key).caseInsensitiveCompare(name) == .orderedSame
}?.value as? String
}
private func totalLength(from contentRange: String?) -> Int64? {
guard let contentRange, let slash = contentRange.lastIndex(of: "/") else {
return nil
@ -378,6 +443,8 @@ final class NativeStreamCacheProxy {
}
private struct HTTPRequest {
let method: String
let path: String
let headers: [String: String]
init?(data: Data) {
@ -385,8 +452,16 @@ private struct HTTPRequest {
let headerBlock = string.components(separatedBy: "\r\n\r\n").first else {
return nil
}
let lines = headerBlock.components(separatedBy: "\r\n")
guard let requestLine = lines.first else {
return nil
}
let parts = requestLine.split(separator: " ")
guard parts.count >= 2 else {
return nil
}
var headers: [String: String] = [:]
for line in headerBlock.components(separatedBy: "\r\n").dropFirst() {
for line in lines.dropFirst() {
guard let separator = line.firstIndex(of: ":") else {
continue
}
@ -394,6 +469,8 @@ private struct HTTPRequest {
let value = line[line.index(after: separator)...].trimmingCharacters(in: .whitespacesAndNewlines)
headers[key] = value
}
method = String(parts[0]).uppercased()
path = String(parts[1])
self.headers = headers
}
}

File diff suppressed because one or more lines are too long