Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions packages/opencode/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,13 +652,18 @@ export namespace Server {
},
}),
async (c) => {
const sessions = await Array.fromAsync(Session.list())
pipe(
await Array.fromAsync(Session.list()),
// Stream sessions one at a time to avoid loading all into memory at once
const sessions: Session.Info[] = []
for await (const session of Session.list()) {
sessions.push(session)
}
// Filter and sort in-place
const result = pipe(
sessions,
filter((s) => !s.time.archived),
sortBy((s) => s.time.updated),
)
return c.json(sessions)
return c.json(result)
},
)
.get(
Expand Down
47 changes: 32 additions & 15 deletions packages/opencode/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export namespace Session {
},
}
log.info("created", result)
await Storage.write(["session", Instance.project.id, result.id], result)
await Storage.writeSession(["session", Instance.project.id, result.id], result)
Bus.publish(Event.Created, {
info: result,
})
Expand Down Expand Up @@ -248,6 +248,8 @@ export namespace Session {
editor(draft)
draft.time.updated = Date.now()
})
// Update SQLite index
await Storage.SQLite.writeSession(result)
Bus.publish(Event.Updated, {
info: result,
})
Expand Down Expand Up @@ -275,18 +277,32 @@ export namespace Session {
},
)

/**
* List sessions for the current project.
* Uses SQLite for memory-efficient streaming - session data is loaded one at a time.
*/
export async function* list() {
const project = Instance.project
for (const item of await Storage.list(["session", project.id])) {
yield Storage.read<Info>(item)
// Use SQLite's streaming list for memory efficiency
for await (const sessionId of Storage.SQLite.listSessionIds(project.id)) {
try {
// Try SQLite first for faster access
yield await Storage.SQLite.readSession<Info>(sessionId)
} catch {
// Fallback to file storage if not in SQLite yet
yield Storage.read<Info>(["session", project.id, sessionId])
}
}
}

/**
* Get child sessions for a parent session.
* Uses streaming to avoid loading all sessions into memory.
*/
export const children = fn(Identifier.schema("session"), async (parentID) => {
const project = Instance.project
const result = [] as Session.Info[]
for (const item of await Storage.list(["session", project.id])) {
const session = await Storage.read<Info>(item)
// Stream sessions one at a time instead of loading all into memory
for await (const session of list()) {
if (session.parentID !== parentID) continue
result.push(session)
}
Expand All @@ -301,13 +317,14 @@ export namespace Session {
await remove(child.id)
}
await unshare(sessionID).catch(() => {})
for (const msg of await Storage.list(["message", sessionID])) {
for (const part of await Storage.list(["part", msg.at(-1)!])) {
await Storage.remove(part)
// Use streaming to avoid loading all messages/parts into memory
for await (const msgId of Storage.SQLite.listMessageIds(sessionID)) {
for await (const partId of Storage.SQLite.listPartIds(msgId)) {
await Storage.removePart(["part", msgId, partId], partId)
}
await Storage.remove(msg)
await Storage.removeMessage(["message", sessionID, msgId], msgId)
}
await Storage.remove(["session", project.id, sessionID])
await Storage.removeSession(["session", project.id, sessionID], sessionID)
Bus.publish(Event.Deleted, {
info: session,
})
Expand All @@ -317,7 +334,7 @@ export namespace Session {
})

export const updateMessage = fn(MessageV2.Info, async (msg) => {
await Storage.write(["message", msg.sessionID, msg.id], msg)
await Storage.writeMessage(["message", msg.sessionID, msg.id], msg)
Bus.publish(MessageV2.Event.Updated, {
info: msg,
})
Expand All @@ -330,7 +347,7 @@ export namespace Session {
messageID: Identifier.schema("message"),
}),
async (input) => {
await Storage.remove(["message", input.sessionID, input.messageID])
await Storage.removeMessage(["message", input.sessionID, input.messageID], input.messageID)
Bus.publish(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
Expand All @@ -346,7 +363,7 @@ export namespace Session {
partID: Identifier.schema("part"),
}),
async (input) => {
await Storage.remove(["part", input.messageID, input.partID])
await Storage.removePart(["part", input.messageID, input.partID], input.partID)
Bus.publish(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
Expand All @@ -371,7 +388,7 @@ export namespace Session {
export const updatePart = fn(UpdatePartInput, async (input) => {
const part = "delta" in input ? input.part : input
const delta = "delta" in input ? input.delta : undefined
await Storage.write(["part", part.messageID, part.id], part)
await Storage.writePart(["part", part.messageID, part.id], part)
Bus.publish(MessageV2.Event.PartUpdated, {
part,
delta,
Expand Down
33 changes: 24 additions & 9 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -541,24 +541,39 @@ export namespace MessageV2 {
return convertToModelMessages(result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")))
}

/**
* Stream messages for a session in descending order (newest first).
* Uses SQLite for memory-efficient streaming - messages are loaded one at a time.
*/
export const stream = fn(Identifier.schema("session"), async function* (sessionID) {
const list = await Array.fromAsync(await Storage.list(["message", sessionID]))
for (let i = list.length - 1; i >= 0; i--) {
// Use SQLite's streaming list for memory efficiency
// Messages are already ordered DESC in SQLite
for await (const messageId of Storage.SQLite.listMessageIds(sessionID)) {
yield await get({
sessionID,
messageID: list[i][2],
messageID: messageId,
})
}
})

/**
* Get all parts for a message.
* Uses SQLite for memory-efficient retrieval.
*/
export const parts = fn(Identifier.schema("message"), async (messageID) => {
const result = [] as MessageV2.Part[]
for (const item of await Storage.list(["part", messageID])) {
const read = await Storage.read<MessageV2.Part>(item)
result.push(read)
// Try to use SQLite's optimized parts list first
try {
return await Storage.SQLite.listParts<MessageV2.Part>(messageID)
} catch {
// Fallback to file-based storage
const result = [] as MessageV2.Part[]
for await (const item of Storage.listStream(["part", messageID])) {
const read = await Storage.read<MessageV2.Part>(item)
result.push(read)
}
result.sort((a, b) => (a.id > b.id ? 1 : -1))
return result
}
result.sort((a, b) => (a.id > b.id ? 1 : -1))
return result
})

export const get = fn(
Expand Down
Loading