Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // LangGraph streaming handler for IPC transport
- // Handles streaming from LangGraph assistant over dedicated event channels
- ipcMain.handle('lg/stream/start', async (event, payload) => {
- const {
- runId,
- eventChannel,
- input,
- context,
- command,
- config,
- }: {
- runId: string;
- eventChannel: string;
- input: any;
- context?: { model?: string; webSearch?: boolean; threadId?: string };
- command?: any;
- config?: any;
- } = payload;
- log.info(`Starting LangGraph stream ${runId} on channel ${eventChannel}`);
- log.debug(`Context:`, context);
- try {
- // Initialize LangGraph client with production credentials
- const client = new Client({
- apiUrl: 'http://localhost:2024/',
- });
- // Get the first assistant
- const assistants = await client.assistants.search({
- metadata: null,
- offset: 0,
- limit: 5,
- });
- if (!assistants || assistants.length === 0) {
- throw new Error('No assistants found');
- }
- const assistant = assistants[0];
- log.info(`Using assistant: ${assistant.assistant_id}`);
- // Use existing thread or create new one
- const threadId = context?.threadId;
- log.info(`Thread ID: ${threadId || 'creating new thread'}`);
- // Build config with context as configurable parameters
- const streamConfig = {
- ...config,
- configurable: {
- ...config?.configurable,
- ...(context && {
- model: context.model,
- webSearch: context.webSearch,
- }),
- },
- };
- // Start streaming from LangGraph - handle null vs string threadId properly
- const streamResponse = threadId
- ? client.runs.stream(threadId, assistant.assistant_id, {
- input: input || {},
- config: streamConfig,
- streamMode: 'messages-tuple',
- })
- : client.runs.stream(null, assistant.assistant_id, {
- input: input || {},
- config: streamConfig,
- streamMode: 'messages',
- });
- let newThreadId: string | null = null;
- // Stream events back to the renderer via the dedicated event channel
- // eslint-disable-next-line no-restricted-syntax
- for await (const chunk of streamResponse) {
- log.debug(`Stream chunk:`, chunk);
- // Capture thread_id from metadata events
- if (chunk.event === 'metadata' && chunk.data?.thread_id) {
- newThreadId = chunk.data.thread_id;
- log.info(`Thread ID from stream: ${newThreadId}`);
- }
- // Forward the stream event to the renderer
- event.sender.send(eventChannel, {
- event: chunk.event,
- data: chunk.data,
- id: runId,
- });
- }
- // Send thread_id back to renderer so it can be used for future messages
- if (newThreadId) {
- event.sender.send(eventChannel, {
- event: 'thread',
- data: { thread_id: newThreadId },
- id: runId,
- });
- }
- log.info(`Stream ${runId} completed successfully`);
- return { success: true, threadId: newThreadId };
- } catch (error) {
- log.error(`Error in LangGraph stream ${runId}:`, error);
- // Send error event to renderer
- event.sender.send(eventChannel, {
- event: 'error',
- data: {
- error: error instanceof Error ? error.name : 'StreamError',
- message: error instanceof Error ? error.message : 'Unknown error',
- },
- id: runId,
- });
- throw error;
- }
- });
- // LangGraph stream cancellation handler
- ipcMain.handle('lg/stream/cancel', async (event, payload) => {
- const { runId }: { runId: string } = payload;
- log.info(`Cancellation requested for stream ${runId}`);
- // TODO: Implement cancellation logic if LangGraph SDK supports it
- // For now, just acknowledge the cancellation request
- return { success: true, runId };
- });
Advertisement
Add Comment
Please, Sign In to add comment