const z = @import( "std" ); const ws = @import( "websocket" ); const req = @import( "req.zig" ); const net = @import( "net-util.zig" ); const config = @import( "config.zig" ); const builtin = @import( "builtin" ); const alloc = z.heap.page_allocator; var lock = z.Thread.Mutex{}; var is_listening: bool = true; var server: ws.Server = undefined; var stopped: bool = false; const u = struct { usingnamespace @import( "util.zig" ); usingnamespace @import( "userdefs.zig" ); usingnamespace @import( "user.zig" ); usingnamespace @import( "userdata.zig" ); }; const ArrayList = z.ArrayList; const ErrRes = net.ErrorResponse; const OkRes = net.OkResponse; const Message = ws.Message; const Handshake = ws.Handshake; const memeql = z.mem.eql; const ServerStatus = struct { lastUpdate: u64, loadedModel: ?[]const u8 = null, isBusy: bool, domain: []const u8, msg: ?[]const u8 = null }; const ServerContext = struct { status: ServerStatus, conn: *ws.Conn, uuid: [36]u8, }; const StateManager = struct { const Self = StateManager; contexts: ArrayList(*ServerContext), pub fn init() Self { var ret: Self = undefined; ret.contexts = ArrayList(*ServerContext).init( alloc ); return ret; } pub fn deinit( self: *Self ) void { for( self.contexts.items ) |ctx| { alloc.destroy( ctx ); } self.contexts.deinit(); } pub fn createContext( self: *Self ) !*ServerContext { lock.lock(); defer lock.unlock(); const ctx = try alloc.create( ServerContext ); ctx.status.lastUpdate = 0; ctx.status.loadedModel = ""; ctx.status.isBusy = false; ctx.status.domain = ""; ctx.uuid = net.uuidv4(); try self.contexts.append( ctx ); return ctx; } pub fn deleteContext( self: *Self, ctx: *ServerContext ) void { lock.lock(); defer lock.unlock(); for( self.contexts.items, 0..self.contexts.items.len ) |it, i| { if( it == ctx ) { alloc.destroy( it ); _=self.contexts.orderedRemove( i ); return; } } } pub fn deleteByUuid( self: *Self, uuid: []const u8 ) void { lock.lock(); defer lock.unlock(); for( self.contexts.items, 0..self.contexts.items.len ) |it, i| { if( it.uuid == uuid ) { alloc.destroy( it ); _=self.contexts.orderedRemove( i ); return; } } } pub fn checkServers( self: *Self ) void { lock.lock(); defer lock.unlock(); for( self.contexts.items ) |it| { if( z.time.milliTimestamp() - 20000 > it.status.lastUpdate ) { z.debug.print( "server {s} not responding for 20sec: disconnecting\n", .{ it.status.domain } ); it.conn.writeClose() catch {}; it.conn.close(); lock.unlock(); // locked again self.deleteContext( it ); lock.lock(); } } } }; const WebsocketHandler = struct { mgr: *StateManager, ctx: *ServerContext, msg: ?u.JsonResponse(ServerStatus) = null, pub fn init( h: ws.Handshake, conn: *ws.Conn, mgr: *StateManager ) !WebsocketHandler { _ = h; const ctx = try mgr.createContext(); ctx.conn = conn; return WebsocketHandler{ .mgr = mgr, .ctx = ctx, }; } pub fn handle( self: *WebsocketHandler, data: Message ) !void { lock.lock(); defer lock.unlock(); if( !is_listening ) return error.ConnectionClosed; const status = u.jsonParseAlloc( ServerStatus, data.data, alloc ) catch { z.debug.print( "received invalid json from server {s}:\n{s}\n", .{ self.ctx.status.domain, data.data } ); return; }; if( self.msg ) |msg| msg.deinit(); self.msg = status; self.ctx.status = status.v; self.ctx.status.msg = null; const reply = try u.jsonStringifyAlloc( .{ .msg = "ping" }, alloc ); defer alloc.free( reply ); try self.ctx.conn.write( reply ); } pub fn close( self: *WebsocketHandler ) void { self.mgr.deleteContext( self.ctx ); } }; var state: StateManager = undefined; fn checkLoop() void { while( is_listening ) { state.checkServers(); u.sendReminderEmails() catch |e| { z.debug.print( "Error sending reminder emails {any} {any}\n", .{e, @errorReturnTrace()} ); }; lock.lock(); z.debug.print( "\x1b[1;32m[api] server count : {d}\n\x1b[0m", .{ state.contexts.items.len } ); for( state.contexts.items ) |ctx| { const status = ctx.status; z.debug.print( " [{s}] model: \x1b[1;32m{s}\x1b[0m busy: \x1b[1;32m{any}\n\x1b[0m", .{ status.domain, if( status.loadedModel != null and status.loadedModel.?.len > 0 ) status.loadedModel.? else "none", status.isBusy } ); } lock.unlock(); // 5 sec. z.time.sleep( 5000000000 ); } z.time.sleep( 5000000000 ); server.deinit( alloc ); lock.lock(); state.deinit(); stopped = true; lock.unlock(); z.debug.print( "server stopped\n", .{} ); } fn dispatchListen() !void { state = StateManager.init(); _= z.Thread.spawn( .{}, checkLoop, .{} ) catch return; const cfg = ws.Config.Server{ .port = @intCast( config.api_port ), .address = "127.0.0.1", .max_headers = 10, }; server = try ws.Server.init( alloc, cfg ); defer server.deinit( alloc ); var no_delay = true; const address = blk: { if( comptime builtin.os.tag != .windows ) { if( cfg.unix_path ) |unix_path| { no_delay = false; z.fs.deleteFileAbsolute( unix_path ) catch {}; break :blk try z.net.Address.initUnix( unix_path ); } } break :blk try z.net.Address.parseIp( cfg.address, cfg.port ); }; var listener = try address.listen( .{ .reuse_address = true, .kernel_backlog = 1024, }); defer listener.deinit(); if( no_delay ) { try z.posix.setsockopt( listener.stream.handle, z.posix.IPPROTO.TCP, 1, &z.mem.toBytes( @as(c_int, 1) ) ); } while( true ) { lock.lock(); if( !is_listening ) { lock.unlock(); break; } lock.unlock(); if( listener.accept() ) |conn| { const args = .{ &server, WebsocketHandler, &state, conn.stream }; const thread = try z.Thread.spawn( .{}, ws.Server.accept, args ); thread.detach(); } else |err| { z.log.err( "failed to accept connection {}", .{err} ); } } } pub fn listen() !void { is_listening = true; _=try z.Thread.spawn( .{}, dispatchListen, .{} ); } pub fn stop() void { lock.lock(); z.debug.print( "server stopping\n", .{} ); is_listening = false; for( state.contexts.items ) |ctx| { ctx.conn.close(); } lock.unlock(); while( true ) { lock.lock(); if( stopped ) break; lock.unlock(); z.Thread.yield() catch {}; } } ///returned string owned by caller pub fn getAvailableServer( model: ?[]const u8, a: z.mem.Allocator ) ?[]const u8 { lock.lock(); defer lock.unlock(); var first_free: ?[]const u8 = null; var first_matched: ?[]const u8 = null; for( state.contexts.items ) |ctx| { if( ctx.status.isBusy ) continue; if( first_free == null ) first_free = ctx.status.domain; if( model != null and first_matched == null and ctx.status.loadedModel != null ) { if( memeql( u8, ctx.status.loadedModel.?, model.? ) ) first_matched = ctx.status.domain; } if( first_matched != null and first_free != null ) break; } if( first_matched == null and first_free == null ) return null; return a.dupe( u8, first_matched orelse first_free.? ) catch null; } pub fn markServerAsBusy( name: []const u8 ) !void { for( state.contexts.items ) |*ctx| { if( memeql( u8, ctx.*.status.domain, name ) ) { ctx.*.status.isBusy = true; return; } } return error.ServerNotFound; }