Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* __LC_ALLOW_ENTRYPOINT_SIDE_EFFECTS__ */
- 'use client';
- import type { Command, Config } from '@langchain/langgraph-sdk';
- import type {
- BagTemplate,
- GetConfigurableType,
- GetUpdateType,
- UseStreamTransport,
- } from './types';
- export type IpcRendererLike = {
- on: (
- channel: string,
- listener: (event: unknown, payload: unknown) => void,
- ) => void;
- off?: (
- channel: string,
- listener: (event: unknown, payload: unknown) => void,
- ) => void;
- send?: (channel: string, payload: unknown) => void;
- invoke?: (channel: string, payload: unknown) => Promise<unknown>;
- };
- export type IpcStreamTransportOptions = {
- /**
- * Channel to request starting a stream.
- * The main process should start producing events on the provided event channel.
- */
- requestChannel: string;
- /**
- * Base name for the dedicated per-run event channel.
- * The renderer will listen on `${eventsBaseChannel}:${runId}`.
- */
- eventsBaseChannel: string;
- /**
- * Optional channel to request cancellation of an in-flight stream.
- * The renderer will send `{ runId }` when the AbortSignal triggers.
- */
- cancelChannel?: string;
- /**
- * IPC surface exposed from preload.
- */
- ipc: IpcRendererLike;
- /**
- * When using `send` (fire-and-forget), set a sentinel event name that indicates
- * completion of the stream so the generator can finish and clean up.
- * If `invoke` is available, this is not required (completion is inferred when
- * the invoke handler resolves).
- * Defaults to `"__complete__"`.
- */
- completeEventName?: string;
- /**
- * Optional callback when thread metadata is received
- */
- onThreadId?: (threadId: string) => void;
- };
- /**
- * IPC transport for useStreamCustom that streams events over Electron IPC instead of HTTP/SSE.
- *
- * Expected main-process behavior:
- * - Handle `requestChannel` with payload `{ runId, eventChannel, input, context, command, config }`
- * - Emit stream events on `eventChannel` with shape `{ event, data, id? }`
- * - Optionally handle `cancelChannel` with payload `{ runId }` to cancel
- * - If using fire-and-forget `send`, emit a final sentinel event named `completeEventName`
- * (default `"__complete__"`) so the generator can complete.
- *
- * This class does not depend on Electron directly; pass a preload-exposed `ipc` adapter.
- */
- export class IpcStreamTransport<
- StateType extends Record<string, unknown> = Record<string, unknown>,
- Bag extends BagTemplate = BagTemplate,
- > implements UseStreamTransport<StateType, Bag>
- {
- // eslint-disable-next-line no-useless-constructor, no-empty-function
- constructor(private readonly options: IpcStreamTransportOptions) {}
- async stream(payload: {
- input: GetUpdateType<Bag, StateType> | null | undefined;
- context: GetConfigurableType<Bag> | undefined;
- command: Command | undefined;
- config: (Config & { configurable?: GetConfigurableType<Bag> }) | undefined;
- signal: AbortSignal;
- }): Promise<AsyncGenerator<{ id?: string; event: string; data: unknown }>> {
- const runId = crypto.randomUUID();
- const eventChannel = `${this.options.eventsBaseChannel}:${runId}`;
- const completeEventName = this.options.completeEventName ?? '__complete__';
- // Internal event queue for the async generator bridge
- const queue: Array<{ id?: string; event: string; data: unknown }> = [];
- let nextResolver:
- | ((
- value: IteratorResult<{ id?: string; event: string; data: unknown }>,
- ) => void)
- | null = null;
- let done = false;
- const resolveNextIfWaiting = () => {
- if (nextResolver && queue.length) {
- const r = nextResolver;
- nextResolver = null;
- r({ value: queue.shift()!, done: false });
- }
- };
- const pushEvent = (evt: { id?: string; event: string; data: unknown }) => {
- queue.push(evt);
- resolveNextIfWaiting();
- };
- let removeListener = () => {};
- const finish = () => {
- if (done) return;
- done = true;
- try {
- removeListener();
- } catch {
- // ignore
- }
- if (nextResolver) {
- const r = nextResolver;
- nextResolver = null;
- r({ value: undefined as never, done: true });
- }
- };
- const listener = (_ipcEvent: unknown, raw: unknown) => {
- // Expecting payload shape `{ event: string, data: unknown, id?: string }`
- const msg = raw as { id?: string; event?: unknown; data?: unknown };
- if (!msg || typeof msg.event !== 'string') return;
- // Intercept completion sentinel for fire-and-forget mode
- if (msg.event === completeEventName) {
- finish();
- return;
- }
- // Capture thread_id from thread or metadata events
- if (this.options.onThreadId) {
- if (msg.event === 'thread' && msg.data) {
- const threadId = (msg.data as { thread_id?: string }).thread_id;
- if (threadId) {
- this.options.onThreadId(threadId);
- }
- } else if (msg.event === 'metadata' && msg.data) {
- const threadId = (msg.data as { thread_id?: string }).thread_id;
- if (threadId) {
- this.options.onThreadId(threadId);
- }
- }
- }
- pushEvent({ id: msg.id, event: msg.event, data: msg.data });
- };
- const getErrorName = (e: unknown, fallback: string): string => {
- if (e && typeof e === 'object' && 'name' in e) {
- const n = (e as { name?: unknown }).name;
- if (typeof n === 'string') return n;
- }
- return fallback;
- };
- const getErrorMessage = (e: unknown, fallback: string): string => {
- if (e && typeof e === 'object' && 'message' in e) {
- const m = (e as { message?: unknown }).message;
- if (typeof m === 'string') return m;
- }
- return fallback;
- };
- const abort = () => {
- if (done) return;
- // Request cancellation upstream first
- if (this.options.cancelChannel) {
- const cancelPayload = { runId };
- if (this.options.ipc.invoke) {
- // eslint-disable-next-line no-void
- void this.options.ipc.invoke(
- this.options.cancelChannel,
- cancelPayload,
- );
- } else {
- this.options.ipc.send?.(this.options.cancelChannel, cancelPayload);
- }
- }
- finish();
- };
- if (payload.signal.aborted) {
- abort();
- } else {
- payload.signal.addEventListener('abort', abort, { once: true });
- }
- // Subscribe to the dedicated event channel
- this.options.ipc.on(eventChannel, listener);
- removeListener = () => this.options.ipc.off?.(eventChannel, listener);
- // Start the stream in main: prefer invoke so we can infer completion when it resolves
- const startPayload = {
- runId,
- eventChannel,
- input: payload.input,
- context: payload.context,
- command: payload.command,
- config: payload.config,
- };
- try {
- if (this.options.ipc.invoke) {
- // eslint-disable-next-line no-void
- void this.options.ipc
- .invoke(this.options.requestChannel, startPayload)
- .catch((error: unknown) => {
- // Surface as stream error event to consumers
- pushEvent({
- event: 'error',
- data: {
- error: getErrorName(error, 'InvokeError'),
- message: getErrorMessage(error, 'Stream invocation failed'),
- },
- });
- })
- .finally(() => {
- // Completion of invoke means the stream is finished
- finish();
- });
- } else {
- // Fire-and-forget start; rely on completion sentinel event
- this.options.ipc.send?.(this.options.requestChannel, startPayload);
- }
- } catch (error) {
- pushEvent({
- event: 'error',
- data: {
- error: getErrorName(error, 'StartError'),
- message: getErrorMessage(error, 'Failed to start stream'),
- },
- });
- // If starting failed synchronously, finish the generator
- finish();
- }
- const waitForNext = () =>
- new Promise<
- IteratorResult<{ id?: string; event: string; data: unknown }>
- >((resolve) => {
- nextResolver = resolve;
- });
- async function* generator() {
- try {
- // Stream until completion
- // eslint-disable-next-line no-constant-condition
- while (true) {
- if (done) return;
- if (queue.length) {
- yield queue.shift()!;
- // eslint-disable-next-line no-continue
- continue;
- }
- // eslint-disable-next-line no-await-in-loop
- const next = await waitForNext();
- if (next.done) return;
- yield next.value!;
- }
- } finally {
- // Ensure cleanup if consumer breaks early
- finish();
- }
- }
- return generator();
- }
- }
- /**
- * Minimal usage example (renderer):
- *
- * const [threadId, setThreadId] = useState<string | null>(null);
- *
- * const transport = new IpcStreamTransport({
- * ipc: window.electron.ipc, // preload-exposed
- * requestChannel: "lg/stream/start",
- * eventsBaseChannel: "lg/stream/events",
- * cancelChannel: "lg/stream/cancel",
- * onThreadId: (newThreadId) => setThreadId(newThreadId),
- * // completeEventName: "__complete__", // only for fire-and-forget send mode
- * });
- *
- * const { submit, values, messages, isLoading, stop } = useStream({
- * transport,
- * messagesKey: "messages",
- * initialValues: { messages: [] },
- * });
- *
- * // Use threadId in context when submitting messages
- * await submit({ messages: [userMessage] }, { context: { threadId } });
- */
Advertisement
Add Comment
Please, Sign In to add comment