diff options
Diffstat (limited to 'backend/api/src/api.zig')
| -rw-r--r-- | backend/api/src/api.zig | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/backend/api/src/api.zig b/backend/api/src/api.zig new file mode 100644 index 0000000..b5d69c0 --- /dev/null +++ b/backend/api/src/api.zig @@ -0,0 +1,307 @@ +const z = @import( "std" ); +const ws = @import( "websocket" ); +const req = @import( "req.zig" ); +const net = @import( "net-util.zig" ); +const config = @import( "config.zig" ); +const builtin = @import( "builtin" ); + +const alloc = z.heap.page_allocator; +var lock = z.Thread.Mutex{}; +var is_listening: bool = true; +var server: ws.Server = undefined; + +var stopped: bool = false; + +const u = struct { + usingnamespace @import( "util.zig" ); + usingnamespace @import( "userdefs.zig" ); + usingnamespace @import( "user.zig" ); + usingnamespace @import( "userdata.zig" ); +}; + +const ArrayList = z.ArrayList; +const ErrRes = net.ErrorResponse; +const OkRes = net.OkResponse; + +const Message = ws.Message; +const Handshake = ws.Handshake; + +const memeql = z.mem.eql; +const ServerStatus = struct { + lastUpdate: u64, + loadedModel: ?[]const u8 = null, + isBusy: bool, + domain: []const u8, + msg: ?[]const u8 = null +}; + +const ServerContext = struct { + status: ServerStatus, + conn: *ws.Conn, + uuid: [36]u8, +}; + +const StateManager = struct { + const Self = StateManager; + contexts: ArrayList(*ServerContext), + + pub fn init() Self { + var ret: Self = undefined; + ret.contexts = ArrayList(*ServerContext).init( alloc ); + return ret; + } + + pub fn deinit( self: *Self ) void { + for( self.contexts.items ) |ctx| { + alloc.destroy( ctx ); + } + self.contexts.deinit(); + } + + pub fn createContext( self: *Self ) !*ServerContext { + lock.lock(); + defer lock.unlock(); + + const ctx = try alloc.create( ServerContext ); + ctx.status.lastUpdate = 0; + ctx.status.loadedModel = ""; + ctx.status.isBusy = false; + ctx.status.domain = ""; + ctx.uuid = net.uuidv4(); + + try self.contexts.append( ctx ); + return ctx; + } + + pub fn deleteContext( self: *Self, ctx: *ServerContext ) void { + lock.lock(); + defer lock.unlock(); + for( self.contexts.items, 0..self.contexts.items.len ) |it, i| { + if( it == ctx ) { + alloc.destroy( it ); + _=self.contexts.orderedRemove( i ); + return; + } + } + } + + pub fn deleteByUuid( self: *Self, uuid: []const u8 ) void { + lock.lock(); + defer lock.unlock(); + for( self.contexts.items, 0..self.contexts.items.len ) |it, i| { + if( it.uuid == uuid ) { + alloc.destroy( it ); + _=self.contexts.orderedRemove( i ); + return; + } + } + } + + pub fn checkServers( self: *Self ) void { + lock.lock(); + defer lock.unlock(); + for( self.contexts.items ) |it| { + if( z.time.milliTimestamp() - 20000 > it.status.lastUpdate ) { + z.debug.print( "server {s} not responding for 20sec: disconnecting\n", .{ it.status.domain } ); + it.conn.writeClose() catch {}; + it.conn.close(); + lock.unlock(); + // locked again + self.deleteContext( it ); + lock.lock(); + } + } + } +}; + +const WebsocketHandler = struct { + mgr: *StateManager, + ctx: *ServerContext, + + msg: ?u.JsonResponse(ServerStatus) = null, + + pub fn init( h: ws.Handshake, conn: *ws.Conn, mgr: *StateManager ) !WebsocketHandler { + _ = h; + + const ctx = try mgr.createContext(); + ctx.conn = conn; + return WebsocketHandler{ + .mgr = mgr, + .ctx = ctx, + }; + } + + pub fn handle( self: *WebsocketHandler, data: Message ) !void { + lock.lock(); + defer lock.unlock(); + if( !is_listening ) + return error.ConnectionClosed; + + const status = u.jsonParseAlloc( ServerStatus, data.data, alloc ) catch { + z.debug.print( "received invalid json from server {s}:\n{s}\n", .{ self.ctx.status.domain, data.data } ); + return; + }; + if( self.msg ) |msg| + msg.deinit(); + self.msg = status; + + self.ctx.status = status.v; + self.ctx.status.msg = null; + + const reply = try u.jsonStringifyAlloc( .{ .msg = "ping" }, alloc ); + defer alloc.free( reply ); + try self.ctx.conn.write( reply ); + } + + pub fn close( self: *WebsocketHandler ) void { + self.mgr.deleteContext( self.ctx ); + } +}; + +var state: StateManager = undefined; + +fn checkLoop() void { + while( is_listening ) { + state.checkServers(); + u.sendReminderEmails() catch |e| { + z.debug.print( "Error sending reminder emails {any} {any}\n", .{e, @errorReturnTrace()} ); + }; + lock.lock(); + z.debug.print( "\x1b[1;32m[api] server count : {d}\n\x1b[0m", .{ state.contexts.items.len } ); + for( state.contexts.items ) |ctx| { + const status = ctx.status; + z.debug.print( " [{s}] model: \x1b[1;32m{s}\x1b[0m busy: \x1b[1;32m{any}\n\x1b[0m", .{ + status.domain, + if( status.loadedModel != null and status.loadedModel.?.len > 0 ) status.loadedModel.? else "none", + status.isBusy + } ); + } + lock.unlock(); + + // 5 sec. + z.time.sleep( 5000000000 ); + } + + z.time.sleep( 5000000000 ); + server.deinit( alloc ); + lock.lock(); + state.deinit(); + stopped = true; + lock.unlock(); + z.debug.print( "server stopped\n", .{} ); +} + +fn dispatchListen() !void { + state = StateManager.init(); + _= z.Thread.spawn( .{}, checkLoop, .{} ) catch return; + + const cfg = ws.Config.Server{ + .port = @intCast( config.api_port ), + .address = "127.0.0.1", + .max_headers = 10, + }; + + server = try ws.Server.init( alloc, cfg ); + defer server.deinit( alloc ); + + var no_delay = true; + const address = blk: { + if( comptime builtin.os.tag != .windows ) { + if( cfg.unix_path ) |unix_path| { + no_delay = false; + z.fs.deleteFileAbsolute( unix_path ) catch {}; + break :blk try z.net.Address.initUnix( unix_path ); + } + } + break :blk try z.net.Address.parseIp( cfg.address, cfg.port ); + }; + var listener = try address.listen( .{ + .reuse_address = true, + .kernel_backlog = 1024, + }); + defer listener.deinit(); + + if( no_delay ) { + try z.posix.setsockopt( listener.stream.handle, z.posix.IPPROTO.TCP, 1, &z.mem.toBytes( @as(c_int, 1) ) ); + } + + while( true ) { + lock.lock(); + if( !is_listening ) { + lock.unlock(); + break; + } + lock.unlock(); + + if( listener.accept() ) |conn| { + const args = .{ &server, WebsocketHandler, &state, conn.stream }; + const thread = try z.Thread.spawn( .{}, ws.Server.accept, args ); + thread.detach(); + } else |err| { + z.log.err( "failed to accept connection {}", .{err} ); + } + } +} + +pub fn listen() !void { + is_listening = true; + _=try z.Thread.spawn( .{}, dispatchListen, .{} ); +} + +pub fn stop() void { + lock.lock(); + z.debug.print( "server stopping\n", .{} ); + is_listening = false; + for( state.contexts.items ) |ctx| { + ctx.conn.close(); + } + lock.unlock(); + + while( true ) { + lock.lock(); + if( stopped ) + break; + lock.unlock(); + + z.Thread.yield() catch {}; + } +} + +///returned string owned by caller +pub fn getAvailableServer( model: ?[]const u8, a: z.mem.Allocator ) ?[]const u8 { + lock.lock(); + defer lock.unlock(); + + var first_free: ?[]const u8 = null; + var first_matched: ?[]const u8 = null; + for( state.contexts.items ) |ctx| { + if( ctx.status.isBusy ) + continue; + + if( first_free == null ) + first_free = ctx.status.domain; + if( model != null and first_matched == null and ctx.status.loadedModel != null ) { + if( memeql( u8, ctx.status.loadedModel.?, model.? ) ) + first_matched = ctx.status.domain; + } + + if( first_matched != null and first_free != null ) + break; + } + + if( first_matched == null and first_free == null ) + return null; + + return a.dupe( u8, first_matched orelse first_free.? ) catch null; +} + +pub fn markServerAsBusy( name: []const u8 ) !void { + for( state.contexts.items ) |*ctx| { + if( memeql( u8, ctx.*.status.domain, name ) ) { + ctx.*.status.isBusy = true; + return; + } + } + + return error.ServerNotFound; +} |
