summaryrefslogtreecommitdiff
path: root/backend/api/src/api.zig
diff options
context:
space:
mode:
Diffstat (limited to 'backend/api/src/api.zig')
-rw-r--r--backend/api/src/api.zig307
1 files changed, 307 insertions, 0 deletions
diff --git a/backend/api/src/api.zig b/backend/api/src/api.zig
new file mode 100644
index 0000000..b5d69c0
--- /dev/null
+++ b/backend/api/src/api.zig
@@ -0,0 +1,307 @@
+const z = @import( "std" );
+const ws = @import( "websocket" );
+const req = @import( "req.zig" );
+const net = @import( "net-util.zig" );
+const config = @import( "config.zig" );
+const builtin = @import( "builtin" );
+
+const alloc = z.heap.page_allocator;
+var lock = z.Thread.Mutex{};
+var is_listening: bool = true;
+var server: ws.Server = undefined;
+
+var stopped: bool = false;
+
+const u = struct {
+ usingnamespace @import( "util.zig" );
+ usingnamespace @import( "userdefs.zig" );
+ usingnamespace @import( "user.zig" );
+ usingnamespace @import( "userdata.zig" );
+};
+
+const ArrayList = z.ArrayList;
+const ErrRes = net.ErrorResponse;
+const OkRes = net.OkResponse;
+
+const Message = ws.Message;
+const Handshake = ws.Handshake;
+
+const memeql = z.mem.eql;
+const ServerStatus = struct {
+ lastUpdate: u64,
+ loadedModel: ?[]const u8 = null,
+ isBusy: bool,
+ domain: []const u8,
+ msg: ?[]const u8 = null
+};
+
+const ServerContext = struct {
+ status: ServerStatus,
+ conn: *ws.Conn,
+ uuid: [36]u8,
+};
+
+const StateManager = struct {
+ const Self = StateManager;
+ contexts: ArrayList(*ServerContext),
+
+ pub fn init() Self {
+ var ret: Self = undefined;
+ ret.contexts = ArrayList(*ServerContext).init( alloc );
+ return ret;
+ }
+
+ pub fn deinit( self: *Self ) void {
+ for( self.contexts.items ) |ctx| {
+ alloc.destroy( ctx );
+ }
+ self.contexts.deinit();
+ }
+
+ pub fn createContext( self: *Self ) !*ServerContext {
+ lock.lock();
+ defer lock.unlock();
+
+ const ctx = try alloc.create( ServerContext );
+ ctx.status.lastUpdate = 0;
+ ctx.status.loadedModel = "";
+ ctx.status.isBusy = false;
+ ctx.status.domain = "";
+ ctx.uuid = net.uuidv4();
+
+ try self.contexts.append( ctx );
+ return ctx;
+ }
+
+ pub fn deleteContext( self: *Self, ctx: *ServerContext ) void {
+ lock.lock();
+ defer lock.unlock();
+ for( self.contexts.items, 0..self.contexts.items.len ) |it, i| {
+ if( it == ctx ) {
+ alloc.destroy( it );
+ _=self.contexts.orderedRemove( i );
+ return;
+ }
+ }
+ }
+
+ pub fn deleteByUuid( self: *Self, uuid: []const u8 ) void {
+ lock.lock();
+ defer lock.unlock();
+ for( self.contexts.items, 0..self.contexts.items.len ) |it, i| {
+ if( it.uuid == uuid ) {
+ alloc.destroy( it );
+ _=self.contexts.orderedRemove( i );
+ return;
+ }
+ }
+ }
+
+ pub fn checkServers( self: *Self ) void {
+ lock.lock();
+ defer lock.unlock();
+ for( self.contexts.items ) |it| {
+ if( z.time.milliTimestamp() - 20000 > it.status.lastUpdate ) {
+ z.debug.print( "server {s} not responding for 20sec: disconnecting\n", .{ it.status.domain } );
+ it.conn.writeClose() catch {};
+ it.conn.close();
+ lock.unlock();
+ // locked again
+ self.deleteContext( it );
+ lock.lock();
+ }
+ }
+ }
+};
+
+const WebsocketHandler = struct {
+ mgr: *StateManager,
+ ctx: *ServerContext,
+
+ msg: ?u.JsonResponse(ServerStatus) = null,
+
+ pub fn init( h: ws.Handshake, conn: *ws.Conn, mgr: *StateManager ) !WebsocketHandler {
+ _ = h;
+
+ const ctx = try mgr.createContext();
+ ctx.conn = conn;
+ return WebsocketHandler{
+ .mgr = mgr,
+ .ctx = ctx,
+ };
+ }
+
+ pub fn handle( self: *WebsocketHandler, data: Message ) !void {
+ lock.lock();
+ defer lock.unlock();
+ if( !is_listening )
+ return error.ConnectionClosed;
+
+ const status = u.jsonParseAlloc( ServerStatus, data.data, alloc ) catch {
+ z.debug.print( "received invalid json from server {s}:\n{s}\n", .{ self.ctx.status.domain, data.data } );
+ return;
+ };
+ if( self.msg ) |msg|
+ msg.deinit();
+ self.msg = status;
+
+ self.ctx.status = status.v;
+ self.ctx.status.msg = null;
+
+ const reply = try u.jsonStringifyAlloc( .{ .msg = "ping" }, alloc );
+ defer alloc.free( reply );
+ try self.ctx.conn.write( reply );
+ }
+
+ pub fn close( self: *WebsocketHandler ) void {
+ self.mgr.deleteContext( self.ctx );
+ }
+};
+
+var state: StateManager = undefined;
+
+fn checkLoop() void {
+ while( is_listening ) {
+ state.checkServers();
+ u.sendReminderEmails() catch |e| {
+ z.debug.print( "Error sending reminder emails {any} {any}\n", .{e, @errorReturnTrace()} );
+ };
+ lock.lock();
+ z.debug.print( "\x1b[1;32m[api] server count : {d}\n\x1b[0m", .{ state.contexts.items.len } );
+ for( state.contexts.items ) |ctx| {
+ const status = ctx.status;
+ z.debug.print( " [{s}] model: \x1b[1;32m{s}\x1b[0m busy: \x1b[1;32m{any}\n\x1b[0m", .{
+ status.domain,
+ if( status.loadedModel != null and status.loadedModel.?.len > 0 ) status.loadedModel.? else "none",
+ status.isBusy
+ } );
+ }
+ lock.unlock();
+
+ // 5 sec.
+ z.time.sleep( 5000000000 );
+ }
+
+ z.time.sleep( 5000000000 );
+ server.deinit( alloc );
+ lock.lock();
+ state.deinit();
+ stopped = true;
+ lock.unlock();
+ z.debug.print( "server stopped\n", .{} );
+}
+
+fn dispatchListen() !void {
+ state = StateManager.init();
+ _= z.Thread.spawn( .{}, checkLoop, .{} ) catch return;
+
+ const cfg = ws.Config.Server{
+ .port = @intCast( config.api_port ),
+ .address = "127.0.0.1",
+ .max_headers = 10,
+ };
+
+ server = try ws.Server.init( alloc, cfg );
+ defer server.deinit( alloc );
+
+ var no_delay = true;
+ const address = blk: {
+ if( comptime builtin.os.tag != .windows ) {
+ if( cfg.unix_path ) |unix_path| {
+ no_delay = false;
+ z.fs.deleteFileAbsolute( unix_path ) catch {};
+ break :blk try z.net.Address.initUnix( unix_path );
+ }
+ }
+ break :blk try z.net.Address.parseIp( cfg.address, cfg.port );
+ };
+ var listener = try address.listen( .{
+ .reuse_address = true,
+ .kernel_backlog = 1024,
+ });
+ defer listener.deinit();
+
+ if( no_delay ) {
+ try z.posix.setsockopt( listener.stream.handle, z.posix.IPPROTO.TCP, 1, &z.mem.toBytes( @as(c_int, 1) ) );
+ }
+
+ while( true ) {
+ lock.lock();
+ if( !is_listening ) {
+ lock.unlock();
+ break;
+ }
+ lock.unlock();
+
+ if( listener.accept() ) |conn| {
+ const args = .{ &server, WebsocketHandler, &state, conn.stream };
+ const thread = try z.Thread.spawn( .{}, ws.Server.accept, args );
+ thread.detach();
+ } else |err| {
+ z.log.err( "failed to accept connection {}", .{err} );
+ }
+ }
+}
+
+pub fn listen() !void {
+ is_listening = true;
+ _=try z.Thread.spawn( .{}, dispatchListen, .{} );
+}
+
+pub fn stop() void {
+ lock.lock();
+ z.debug.print( "server stopping\n", .{} );
+ is_listening = false;
+ for( state.contexts.items ) |ctx| {
+ ctx.conn.close();
+ }
+ lock.unlock();
+
+ while( true ) {
+ lock.lock();
+ if( stopped )
+ break;
+ lock.unlock();
+
+ z.Thread.yield() catch {};
+ }
+}
+
+///returned string owned by caller
+pub fn getAvailableServer( model: ?[]const u8, a: z.mem.Allocator ) ?[]const u8 {
+ lock.lock();
+ defer lock.unlock();
+
+ var first_free: ?[]const u8 = null;
+ var first_matched: ?[]const u8 = null;
+ for( state.contexts.items ) |ctx| {
+ if( ctx.status.isBusy )
+ continue;
+
+ if( first_free == null )
+ first_free = ctx.status.domain;
+ if( model != null and first_matched == null and ctx.status.loadedModel != null ) {
+ if( memeql( u8, ctx.status.loadedModel.?, model.? ) )
+ first_matched = ctx.status.domain;
+ }
+
+ if( first_matched != null and first_free != null )
+ break;
+ }
+
+ if( first_matched == null and first_free == null )
+ return null;
+
+ return a.dupe( u8, first_matched orelse first_free.? ) catch null;
+}
+
+pub fn markServerAsBusy( name: []const u8 ) !void {
+ for( state.contexts.items ) |*ctx| {
+ if( memeql( u8, ctx.*.status.domain, name ) ) {
+ ctx.*.status.isBusy = true;
+ return;
+ }
+ }
+
+ return error.ServerNotFound;
+}