diff --git a/Dreamio/NativeStreamCacheProxy.swift b/Dreamio/NativeStreamCacheProxy.swift index 9200a71..97308b6 100644 --- a/Dreamio/NativeStreamCacheProxy.swift +++ b/Dreamio/NativeStreamCacheProxy.swift @@ -67,6 +67,7 @@ final class CachedRangeStore { private let directory: URL private let byteBudget: Int64 private let fileManager: FileManager + private let lock = NSLock() private var chunks: [Chunk] = [] init(sessionID: String, byteBudget: Int64, fileManager: FileManager = .default) { @@ -79,6 +80,8 @@ final class CachedRangeStore { } func lookup(range: HTTPRange, maximumLength: Int64) -> Lookup? { + lock.lock() + defer { lock.unlock() } let requestedEnd = range.end ?? range.start + maximumLength - 1 var cursor = range.start var data = Data() @@ -115,6 +118,8 @@ final class CachedRangeStore { } func store(data: Data, start: Int64) { + lock.lock() + defer { lock.unlock() } guard !data.isEmpty else { return } @@ -133,6 +138,8 @@ final class CachedRangeStore { } func evictKeepingBytesNear(offset: Int64) { + lock.lock() + defer { lock.unlock() } let lowerBound = max(0, offset - byteBudget) let removed = chunks.filter { $0.end < lowerBound } chunks.removeAll { $0.end < lowerBound } @@ -146,6 +153,8 @@ final class CachedRangeStore { } func removeAll() { + lock.lock() + defer { lock.unlock() } try? fileManager.removeItem(at: directory) chunks.removeAll() } @@ -200,11 +209,12 @@ final class NativeStreamCacheProxy { private let prefetchLength: Int64 private var listener: NWListener? private let queue = DispatchQueue(label: "dreamio.native-stream-cache-proxy") + private let workQueue = DispatchQueue(label: "dreamio.native-stream-cache-proxy.work", attributes: .concurrent) - init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 8 * 1024 * 1024) { + init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 1024 * 1024) { self.session = session self.fetchLength = fetchLength - prefetchLength = fetchLength + prefetchLength = 4 * fetchLength let budget = byteBudget ?? max(30 * 1024 * 1024, (session.estimatedBitrate ?? 0) * 30 / 8) store = CachedRangeStore(sessionID: session.id, byteBudget: budget) } @@ -260,12 +270,26 @@ final class NativeStreamCacheProxy { connection.cancel() return } - let range = HTTPRange.parse(request.headers["range"]) - self.respond(to: range, on: connection) + self.workQueue.async { + self.respond(to: request, on: connection) + } } } - private func respond(to requestedRange: HTTPRange?, on connection: NWConnection) { + private func respond(to request: HTTPRequest, on connection: NWConnection) { + guard request.path.hasPrefix("/stream/") else { + sendStatus(404, on: connection) + return + } + if request.method == "HEAD" { + respondToHead(on: connection) + return + } + guard request.method == "GET" else { + sendStatus(405, on: connection) + return + } + let requestedRange = HTTPRange.parse(request.headers["range"]) let range = requestedRange ?? HTTPRange(start: 0, end: fetchLength - 1) let maximumLength = range.length ?? fetchLength if let lookup = store.lookup(range: range, maximumLength: maximumLength), lookup.isComplete { @@ -292,8 +316,8 @@ final class NativeStreamCacheProxy { if responseStatus == 206 { store.store(data: response.data, start: range.start) store.evictKeepingBytesNear(offset: range.start) - let contentType = response.headers["Content-Type"] as? String - let totalLength = totalLength(from: response.headers["Content-Range"] as? String) + let contentType = headerValue(response.headers, named: "Content-Type") + let totalLength = totalLength(from: headerValue(response.headers, named: "Content-Range")) send(data: response.data, statusCode: 206, rangeStart: range.start, totalLength: totalLength, contentType: contentType, on: connection) prefetch(after: range.start + Int64(response.data.count)) } else { @@ -304,6 +328,31 @@ final class NativeStreamCacheProxy { } } + private func respondToHead(on connection: NWConnection) { + guard let response = fetchHead() ?? fetch(range: HTTPRange(start: 0, end: 0)) else { + sendStatus(502, on: connection) + return + } + var headers = [ + "Accept-Ranges": response.statusCode == 206 ? "bytes" : headerValue(response.headers, named: "Accept-Ranges") ?? "bytes", + "Connection": "close" + ] + if let contentType = headerValue(response.headers, named: "Content-Type") { + headers["Content-Type"] = contentType + } + if let length = headerValue(response.headers, named: "Content-Length") { + headers["Content-Length"] = length + } else if let total = totalLength(from: headerValue(response.headers, named: "Content-Range")) { + headers["Content-Length"] = "\(total)" + } else { + headers["Content-Length"] = "0" + } +#if DEBUG + print("[DreamioStreamProxy] head status=\(response.statusCode) length=\(headers["Content-Length"] ?? "unknown")") +#endif + send(statusCode: 200, headers: headers, body: Data(), on: connection) + } + private func prefetch(after offset: Int64) { let range = HTTPRange(start: offset, end: offset + prefetchLength - 1) queue.async { [weak self] in @@ -318,11 +367,21 @@ final class NativeStreamCacheProxy { } private func fetch(range: HTTPRange) -> UpstreamResponse? { + fetch(request: upstreamRequest(for: range)) + } + + private func fetchHead() -> UpstreamResponse? { + var request = upstreamRequest(for: nil) + request.httpMethod = "HEAD" + return fetch(request: request) + } + + private func fetch(request: URLRequest) -> UpstreamResponse? { let semaphore = DispatchSemaphore(value: 0) var result: UpstreamResponse? - URLSession.shared.dataTask(with: upstreamRequest(for: range)) { data, response, _ in - if let http = response as? HTTPURLResponse, let data { - result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data) + URLSession.shared.dataTask(with: request) { data, response, _ in + if let http = response as? HTTPURLResponse { + result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data ?? Data()) } semaphore.signal() }.resume() @@ -350,7 +409,7 @@ final class NativeStreamCacheProxy { "Accept-Ranges": "none", "Connection": "close" ] - if let contentType = upstreamHeaders["Content-Type"] as? String { + if let contentType = headerValue(upstreamHeaders, named: "Content-Type") { headers["Content-Type"] = contentType } send(statusCode: statusCode, headers: headers, body: data, on: connection) @@ -368,6 +427,12 @@ final class NativeStreamCacheProxy { connection.send(content: response, completion: .contentProcessed { _ in connection.cancel() }) } + private func headerValue(_ headers: [AnyHashable: Any], named name: String) -> String? { + headers.first { key, _ in + String(describing: key).caseInsensitiveCompare(name) == .orderedSame + }?.value as? String + } + private func totalLength(from contentRange: String?) -> Int64? { guard let contentRange, let slash = contentRange.lastIndex(of: "/") else { return nil @@ -378,6 +443,8 @@ final class NativeStreamCacheProxy { } private struct HTTPRequest { + let method: String + let path: String let headers: [String: String] init?(data: Data) { @@ -385,8 +452,16 @@ private struct HTTPRequest { let headerBlock = string.components(separatedBy: "\r\n\r\n").first else { return nil } + let lines = headerBlock.components(separatedBy: "\r\n") + guard let requestLine = lines.first else { + return nil + } + let parts = requestLine.split(separator: " ") + guard parts.count >= 2 else { + return nil + } var headers: [String: String] = [:] - for line in headerBlock.components(separatedBy: "\r\n").dropFirst() { + for line in lines.dropFirst() { guard let separator = line.firstIndex(of: ":") else { continue } @@ -394,6 +469,8 @@ private struct HTTPRequest { let value = line[line.index(after: separator)...].trimmingCharacters(in: .whitespacesAndNewlines) headers[key] = value } + method = String(parts[0]).uppercased() + path = String(parts[1]) self.headers = headers } } diff --git a/docs/turns/2026-05-25-local-seek-buffer-vlc-playback.html b/docs/turns/2026-05-25-local-seek-buffer-vlc-playback.html index 5ed8643..80b63a0 100644 --- a/docs/turns/2026-05-25-local-seek-buffer-vlc-playback.html +++ b/docs/turns/2026-05-25-local-seek-buffer-vlc-playback.html @@ -92,7 +92,7 @@ pre { overflow: auto; background: #22242d; color: #f4f1ea; padding: 14px; border
66 unmodified lines6768697071726 unmodified lines79808182838430 unmodified lines11511611711811912012 unmodified lines1331341351361371387 unmodified lines14614714814915015148 unmodified lines20020120220320420520620720820921049 unmodified lines26026126226326426526626726826927027120 unmodified lines2922932942952962972982994 unmodified lines3043053063073083098 unmodified lines31831932032132232332432532632732821 unmodified lines35035135235335435535611 unmodified lines3683693703713723734 unmodified lines3783793803813823831 unmodified line3853863873883893903913921 unmodified line39439539639739839966 unmodified linesprivate let directory: URLprivate let byteBudget: Int64private let fileManager: FileManagerprivate var chunks: [Chunk] = []+init(sessionID: String, byteBudget: Int64, fileManager: FileManager = .default) {6 unmodified lines}+func lookup(range: HTTPRange, maximumLength: Int64) -> Lookup? {let requestedEnd = range.end ?? range.start + maximumLength - 1var cursor = range.startvar data = Data()30 unmodified lines}+func store(data: Data, start: Int64) {guard !data.isEmpty else {return}12 unmodified lines}+func evictKeepingBytesNear(offset: Int64) {let lowerBound = max(0, offset - byteBudget)let removed = chunks.filter { $0.end < lowerBound }chunks.removeAll { $0.end < lowerBound }7 unmodified lines}+func removeAll() {try? fileManager.removeItem(at: directory)chunks.removeAll()}48 unmodified linesprivate let prefetchLength: Int64private var listener: NWListener?private let queue = DispatchQueue(label: "dreamio.native-stream-cache-proxy")+init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 8 * 1024 * 1024) {self.session = sessionself.fetchLength = fetchLengthprefetchLength = fetchLengthlet budget = byteBudget ?? max(30 * 1024 * 1024, (session.estimatedBitrate ?? 0) * 30 / 8)store = CachedRangeStore(sessionID: session.id, byteBudget: budget)}49 unmodified linesconnection.cancel()return}let range = HTTPRange.parse(request.headers["range"])self.respond(to: range, on: connection)}}+private func respond(to requestedRange: HTTPRange?, on connection: NWConnection) {let range = requestedRange ?? HTTPRange(start: 0, end: fetchLength - 1)let maximumLength = range.length ?? fetchLengthif let lookup = store.lookup(range: range, maximumLength: maximumLength), lookup.isComplete {20 unmodified linesif responseStatus == 206 {store.store(data: response.data, start: range.start)store.evictKeepingBytesNear(offset: range.start)let contentType = response.headers["Content-Type"] as? Stringlet totalLength = totalLength(from: response.headers["Content-Range"] as? String)send(data: response.data, statusCode: 206, rangeStart: range.start, totalLength: totalLength, contentType: contentType, on: connection)prefetch(after: range.start + Int64(response.data.count))} else {4 unmodified lines}}+private func prefetch(after offset: Int64) {let range = HTTPRange(start: offset, end: offset + prefetchLength - 1)queue.async { [weak self] in8 unmodified lines}+private func fetch(range: HTTPRange) -> UpstreamResponse? {let semaphore = DispatchSemaphore(value: 0)var result: UpstreamResponse?URLSession.shared.dataTask(with: upstreamRequest(for: range)) { data, response, _ inif let http = response as? HTTPURLResponse, let data {result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data)}semaphore.signal()}.resume()21 unmodified lines"Accept-Ranges": "none","Connection": "close"]if let contentType = upstreamHeaders["Content-Type"] as? String {headers["Content-Type"] = contentType}send(statusCode: statusCode, headers: headers, body: data, on: connection)11 unmodified linesconnection.send(content: response, completion: .contentProcessed { _ in connection.cancel() })}+private func totalLength(from contentRange: String?) -> Int64? {guard let contentRange, let slash = contentRange.lastIndex(of: "/") else {return nil4 unmodified lines}+private struct HTTPRequest {let headers: [String: String]+init?(data: Data) {1 unmodified linelet headerBlock = string.components(separatedBy: "\r\n\r\n").first else {return nil}var headers: [String: String] = [:]for line in headerBlock.components(separatedBy: "\r\n").dropFirst() {guard let separator = line.firstIndex(of: ":") else {continue}1 unmodified linelet value = line[line.index(after: separator)...].trimmingCharacters(in: .whitespacesAndNewlines)headers[key] = value}self.headers = headers}}66 unmodified lines676869707172736 unmodified lines808182838485868730 unmodified lines11811912012112212312412512 unmodified lines1381391401411421431441457 unmodified lines15315415515615715815916048 unmodified lines20921021121221321421521621721821922049 unmodified lines27027127227327427527627727827928028128228328428528628728828929029129229329429520 unmodified lines3163173183193203213223234 unmodified lines3283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573588 unmodified lines36736836937037137237337437537637737837938038138238338438538638721 unmodified lines40941041141241341441511 unmodified lines4274284294304314324334344354364374384 unmodified lines4434444454464474484494501 unmodified line4524534544554564574584594604614624634644654664671 unmodified line46947047147247347447547666 unmodified linesprivate let directory: URLprivate let byteBudget: Int64private let fileManager: FileManagerprivate let lock = NSLock()private var chunks: [Chunk] = []+init(sessionID: String, byteBudget: Int64, fileManager: FileManager = .default) {6 unmodified lines}+func lookup(range: HTTPRange, maximumLength: Int64) -> Lookup? {lock.lock()defer { lock.unlock() }let requestedEnd = range.end ?? range.start + maximumLength - 1var cursor = range.startvar data = Data()30 unmodified lines}+func store(data: Data, start: Int64) {lock.lock()defer { lock.unlock() }guard !data.isEmpty else {return}12 unmodified lines}+func evictKeepingBytesNear(offset: Int64) {lock.lock()defer { lock.unlock() }let lowerBound = max(0, offset - byteBudget)let removed = chunks.filter { $0.end < lowerBound }chunks.removeAll { $0.end < lowerBound }7 unmodified lines}+func removeAll() {lock.lock()defer { lock.unlock() }try? fileManager.removeItem(at: directory)chunks.removeAll()}48 unmodified linesprivate let prefetchLength: Int64private var listener: NWListener?private let queue = DispatchQueue(label: "dreamio.native-stream-cache-proxy")private let workQueue = DispatchQueue(label: "dreamio.native-stream-cache-proxy.work", attributes: .concurrent)+init(session: Session, byteBudget: Int64? = nil, fetchLength: Int64 = 1024 * 1024) {self.session = sessionself.fetchLength = fetchLengthprefetchLength = 4 * fetchLengthlet budget = byteBudget ?? max(30 * 1024 * 1024, (session.estimatedBitrate ?? 0) * 30 / 8)store = CachedRangeStore(sessionID: session.id, byteBudget: budget)}49 unmodified linesconnection.cancel()return}self.workQueue.async {self.respond(to: request, on: connection)}}}+private func respond(to request: HTTPRequest, on connection: NWConnection) {guard request.path.hasPrefix("/stream/") else {sendStatus(404, on: connection)return}if request.method == "HEAD" {respondToHead(on: connection)return}guard request.method == "GET" else {sendStatus(405, on: connection)return}let requestedRange = HTTPRange.parse(request.headers["range"])let range = requestedRange ?? HTTPRange(start: 0, end: fetchLength - 1)let maximumLength = range.length ?? fetchLengthif let lookup = store.lookup(range: range, maximumLength: maximumLength), lookup.isComplete {20 unmodified linesif responseStatus == 206 {store.store(data: response.data, start: range.start)store.evictKeepingBytesNear(offset: range.start)let contentType = headerValue(response.headers, named: "Content-Type")let totalLength = totalLength(from: headerValue(response.headers, named: "Content-Range"))send(data: response.data, statusCode: 206, rangeStart: range.start, totalLength: totalLength, contentType: contentType, on: connection)prefetch(after: range.start + Int64(response.data.count))} else {4 unmodified lines}}+private func respondToHead(on connection: NWConnection) {guard let response = fetchHead() ?? fetch(range: HTTPRange(start: 0, end: 0)) else {sendStatus(502, on: connection)return}var headers = ["Accept-Ranges": response.statusCode == 206 ? "bytes" : headerValue(response.headers, named: "Accept-Ranges") ?? "bytes","Connection": "close"]if let contentType = headerValue(response.headers, named: "Content-Type") {headers["Content-Type"] = contentType}if let length = headerValue(response.headers, named: "Content-Length") {headers["Content-Length"] = length} else if let total = totalLength(from: headerValue(response.headers, named: "Content-Range")) {headers["Content-Length"] = "\(total)"} else {headers["Content-Length"] = "0"}#if DEBUGprint("[DreamioStreamProxy] head status=\(response.statusCode) length=\(headers["Content-Length"] ?? "unknown")")#endifsend(statusCode: 200, headers: headers, body: Data(), on: connection)}+private func prefetch(after offset: Int64) {let range = HTTPRange(start: offset, end: offset + prefetchLength - 1)queue.async { [weak self] in8 unmodified lines}+private func fetch(range: HTTPRange) -> UpstreamResponse? {fetch(request: upstreamRequest(for: range))}+private func fetchHead() -> UpstreamResponse? {var request = upstreamRequest(for: nil)request.httpMethod = "HEAD"return fetch(request: request)}+private func fetch(request: URLRequest) -> UpstreamResponse? {let semaphore = DispatchSemaphore(value: 0)var result: UpstreamResponse?URLSession.shared.dataTask(with: request) { data, response, _ inif let http = response as? HTTPURLResponse {result = UpstreamResponse(statusCode: http.statusCode, headers: http.allHeaderFields, data: data ?? Data())}semaphore.signal()}.resume()21 unmodified lines"Accept-Ranges": "none","Connection": "close"]if let contentType = headerValue(upstreamHeaders, named: "Content-Type") {headers["Content-Type"] = contentType}send(statusCode: statusCode, headers: headers, body: data, on: connection)11 unmodified linesconnection.send(content: response, completion: .contentProcessed { _ in connection.cancel() })}+private func headerValue(_ headers: [AnyHashable: Any], named name: String) -> String? {headers.first { key, _ inString(describing: key).caseInsensitiveCompare(name) == .orderedSame}?.value as? String}+private func totalLength(from contentRange: String?) -> Int64? {guard let contentRange, let slash = contentRange.lastIndex(of: "/") else {return nil4 unmodified lines}+private struct HTTPRequest {let method: Stringlet path: Stringlet headers: [String: String]+init?(data: Data) {1 unmodified linelet headerBlock = string.components(separatedBy: "\r\n\r\n").first else {return nil}let lines = headerBlock.components(separatedBy: "\r\n")guard let requestLine = lines.first else {return nil}let parts = requestLine.split(separator: " ")guard parts.count >= 2 else {return nil}var headers: [String: String] = [:]for line in lines.dropFirst() {guard let separator = line.firstIndex(of: ":") else {continue}1 unmodified linelet value = line[line.index(after: separator)...].trimmingCharacters(in: .whitespacesAndNewlines)headers[key] = value}method = String(parts[0]).uppercased()path = String(parts[1])self.headers = headers}}
Follow-up Beads issue: dreamio-6bv.