Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Switching schemas on an Oracle connection no longer hangs on an infinite loading spinner. Oracle now switches by schema like BigQuery, the sidebar lists every schema with its tables loading on expand, Oracle queries respect the query timeout setting and can be cancelled, and a schema load that fails now shows an error with a Retry button instead of spinning forever. Requires the updated Oracle plugin for the full fix. (#1807)

## [0.55.0] - 2026-07-04

### Added
Expand Down
271 changes: 168 additions & 103 deletions Plugins/OracleDriverPlugin/OracleConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct OracleError: Error {
case authVersionNotSupported
case authConnectionDropped(phase: String?)
case loginTimedOut
case queryTimedOut
case nativeEncryptionFailed
}

Expand Down Expand Up @@ -127,6 +128,7 @@ final class OracleConnectionWrapper: @unchecked Sendable {
private struct LockedState: Sendable {
var isConnected = false
var nioConnection: OracleNIO.OracleConnection?
var queryTimeoutSeconds = 0
}

private let state = OSAllocatedUnfairLock(initialState: LockedState())
Expand Down Expand Up @@ -297,13 +299,19 @@ final class OracleConnectionWrapper: @unchecked Sendable {
return String(localized: "This account uses a password verifier the database driver does not support.")
case .loginTimedOut:
return loginTimeoutMessage
case .queryTimedOut:
return queryTimeoutMessage
case .nativeEncryptionFailed:
return nativeEncryptionFailureMessage
case .generic, .notConnected, .connectionFailed, .queryFailed, .protocolError:
return serverDetail
}
}

private static let queryTimeoutMessage = String(
localized: "The query did not finish within the configured timeout, so the connection was reset. Run the query again."
)

private func mapQueryError(_ sqlError: OracleSQLError) -> OracleError {
guard Self.isChannelFatal(sqlError) else {
return OracleError(message: sqlError.serverInfo?.message ?? sqlError.description)
Expand Down Expand Up @@ -340,71 +348,71 @@ final class OracleConnectionWrapper: @unchecked Sendable {
}
}

func applyQueryTimeout(_ seconds: Int) {
state.withLock { $0.queryTimeoutSeconds = max(0, seconds) }
}

func abortCurrentQuery() {
guard isConnected else { return }
osLogger.notice("Aborting Oracle query by closing the connection; the driver has no server-side cancel")
disconnect()
}

// MARK: - Query Execution

func executeQuery(_ query: String) async throws -> OracleQueryResult {
let connection = try state.withLock { current -> OracleNIO.OracleConnection in
private func requireConnection() throws -> OracleNIO.OracleConnection {
try state.withLock { current in
guard let conn = current.nioConnection, current.isConnected else {
throw OracleError.notConnected
}
return conn
}
}

/// Races the operation against the configured query timeout. On timeout the
/// connection is closed first, which fails the in-flight OracleNIO call even
/// if it ignores task cancellation, so the group can always unwind.
private func withQueryDeadline<T: Sendable>(
_ operation: @escaping @Sendable () async throws -> T
) async throws -> T {
let timeoutSeconds = state.withLock { $0.queryTimeoutSeconds }
guard timeoutSeconds > 0 else { return try await operation() }

return try await withThrowingTaskGroup(of: T?.self) { group in
group.addTask { try await operation() }
group.addTask {
try await Task.sleep(nanoseconds: UInt64(timeoutSeconds) * 1_000_000_000)
return nil
}
defer { group.cancelAll() }
guard let first = try await group.next(), let result = first else {
disconnect()
throw TimeoutError(seconds: Double(timeoutSeconds))
}
return result
}
}

private func queryTimeoutError(_ timeout: TimeoutError) -> OracleError {
osLogger.error("Oracle query timed out after \(Int(timeout.seconds), privacy: .public)s; the connection was closed to recover")
return OracleError(message: Self.queryTimeoutMessage, category: .queryTimedOut)
}

func executeQuery(_ query: String) async throws -> OracleQueryResult {
let connection = try requireConnection()

// OracleNIO does not support concurrent queries on a single connection.
// Serialize all queries to prevent state-machine corruption.
await queryGate.acquire()

do {
let statement = OracleStatement(stringLiteral: query)
let stream = try await connection.execute(statement, logger: nioLogger)

// Read column metadata from stream (available even with 0 rows)
var columns: [String] = []
for col in stream.columns {
columns.append(col.name)
}
osLogger.debug("Oracle columns: \(columns.count) — \(columns.joined(separator: ", "))")

var columnTypeNames: [String] = []
var allRows: [[PluginCellValue]] = []
var didReadTypes = false
var truncated = false

for try await row in stream {
var rowValues: [PluginCellValue] = []
for cell in row {
if !didReadTypes {
columnTypeNames.append(oracleTypeName(cell.dataType))
}
if cell.bytes == nil {
rowValues.append(.null)
} else if cell.dataType == .raw || cell.dataType == .longRAW || cell.dataType == .blob,
let bytes = cell.bytes {
rowValues.append(.bytes(Data(bytes.readableBytesView)))
} else {
rowValues.append(PluginCellValue.fromOptional(decodeCell(cell)))
}
}
didReadTypes = true
allRows.append(rowValues)
if allRows.count >= PluginRowLimits.emergencyMax {
truncated = true
break
}
}

if !didReadTypes {
columnTypeNames = Array(repeating: "unknown", count: columns.count)
let result = try await withQueryDeadline { [self] in
try await collectRows(query, on: connection)
}

await queryGate.release()
return OracleQueryResult(
columns: columns,
columnTypeNames: columnTypeNames,
rows: allRows,
affectedRows: allRows.count,
isTruncated: truncated
)
return result
} catch let timeout as TimeoutError {
throw queryTimeoutError(timeout)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Release the Oracle query gate on timeout

When an Oracle query times out while another operation has already passed requireConnection() and is waiting in queryGate.acquire() (for example a schema reload or a second tab query), this timeout branch throws without releasing the gate. Because withQueryDeadline closes the connection and then throws TimeoutError, the queued waiter is never resumed, leaving that operation stuck indefinitely instead of failing/reconnecting; the streaming timeout branch has the same missing release.

Useful? React with 👍 / 👎.

} catch let sqlError as OracleSQLError {
await queryGate.release()
throw mapQueryError(sqlError)
Expand All @@ -420,85 +428,142 @@ final class OracleConnectionWrapper: @unchecked Sendable {
}
}

private func collectRows(
_ query: String,
on connection: OracleNIO.OracleConnection
) async throws -> OracleQueryResult {
let statement = OracleStatement(stringLiteral: query)
let stream = try await connection.execute(statement, logger: nioLogger)

// Read column metadata from stream (available even with 0 rows)
var columns: [String] = []
for col in stream.columns {
columns.append(col.name)
}
osLogger.debug("Oracle columns (\(columns.count)): \(columns.joined(separator: ", "))")

var columnTypeNames: [String] = []
var allRows: [[PluginCellValue]] = []
var didReadTypes = false
var truncated = false

for try await row in stream {
var rowValues: [PluginCellValue] = []
for cell in row {
if !didReadTypes {
columnTypeNames.append(oracleTypeName(cell.dataType))
}
if cell.bytes == nil {
rowValues.append(.null)
} else if cell.dataType == .raw || cell.dataType == .longRAW || cell.dataType == .blob,
let bytes = cell.bytes {
rowValues.append(.bytes(Data(bytes.readableBytesView)))
} else {
rowValues.append(PluginCellValue.fromOptional(decodeCell(cell)))
}
}
didReadTypes = true
allRows.append(rowValues)
if allRows.count >= PluginRowLimits.emergencyMax {
truncated = true
break
}
}

if !didReadTypes {
columnTypeNames = Array(repeating: "unknown", count: columns.count)
}

return OracleQueryResult(
columns: columns,
columnTypeNames: columnTypeNames,
rows: allRows,
affectedRows: allRows.count,
isTruncated: truncated
)
}

// MARK: - Streaming Query

func streamQuery(
_ query: String,
continuation: AsyncThrowingStream<PluginStreamElement, Error>.Continuation
) async throws {
let connection = try state.withLock { current -> OracleNIO.OracleConnection in
guard let conn = current.nioConnection, current.isConnected else {
throw OracleError.notConnected
}
return conn
}
let connection = try requireConnection()

await queryGate.acquire()

do {
let statement = OracleStatement(stringLiteral: query)
let stream = try await connection.execute(statement, logger: nioLogger)

var columns: [String] = []
for col in stream.columns {
columns.append(col.name)
try await withQueryDeadline { [self] in
try await streamRows(query, on: connection, continuation: continuation)
}
await queryGate.release()
continuation.finish()
} catch let timeout as TimeoutError {
throw queryTimeoutError(timeout)
} catch let sqlError as OracleSQLError {
await queryGate.release()
throw mapQueryError(sqlError)
} catch is CancellationError {
await queryGate.release()
throw CancellationError()
} catch {
await queryGate.release()
throw OracleError(message: "Query execution failed: \(String(describing: error))")
}
}

private func streamRows(
_ query: String,
on connection: OracleNIO.OracleConnection,
continuation: AsyncThrowingStream<PluginStreamElement, Error>.Continuation
) async throws {
let statement = OracleStatement(stringLiteral: query)
let stream = try await connection.execute(statement, logger: nioLogger)

var columnTypeNames: [String] = []
var headerSent = false
var columns: [String] = []
for col in stream.columns {
columns.append(col.name)
}

for try await row in stream {
if Task.isCancelled {
await queryGate.release()
continuation.finish(throwing: CancellationError())
return
}
var columnTypeNames: [String] = []
var headerSent = false

var rowValues: [PluginCellValue] = []
for cell in row {
if !headerSent {
columnTypeNames.append(oracleTypeName(cell.dataType))
}
if cell.bytes == nil {
rowValues.append(.null)
} else if cell.dataType == .raw || cell.dataType == .longRAW || cell.dataType == .blob,
let bytes = cell.bytes {
rowValues.append(.bytes(Data(bytes.readableBytesView)))
} else {
rowValues.append(PluginCellValue.fromOptional(decodeCell(cell)))
}
}
for try await row in stream {
try Task.checkCancellation()

var rowValues: [PluginCellValue] = []
for cell in row {
if !headerSent {
continuation.yield(.header(PluginStreamHeader(
columns: columns,
columnTypeNames: columnTypeNames
)))
headerSent = true
columnTypeNames.append(oracleTypeName(cell.dataType))
}
if cell.bytes == nil {
rowValues.append(.null)
} else if cell.dataType == .raw || cell.dataType == .longRAW || cell.dataType == .blob,
let bytes = cell.bytes {
rowValues.append(.bytes(Data(bytes.readableBytesView)))
} else {
rowValues.append(PluginCellValue.fromOptional(decodeCell(cell)))
}

continuation.yield(.rows([rowValues]))
}

if !headerSent {
columnTypeNames = Array(repeating: "unknown", count: columns.count)
continuation.yield(.header(PluginStreamHeader(
columns: columns,
columnTypeNames: columnTypeNames
)))
headerSent = true
}

await queryGate.release()
continuation.finish()
} catch let sqlError as OracleSQLError {
await queryGate.release()
throw mapQueryError(sqlError)
} catch is CancellationError {
await queryGate.release()
throw CancellationError()
} catch {
await queryGate.release()
throw OracleError(message: "Query execution failed: \(String(describing: error))")
continuation.yield(.rows([rowValues]))
}

if !headerSent {
columnTypeNames = Array(repeating: "unknown", count: columns.count)
continuation.yield(.header(PluginStreamHeader(
columns: columns,
columnTypeNames: columnTypeNames
)))
}
}

Expand Down
Loading
Loading