From 50c7a3841c4f2edba2ceef4d570af86ba43b36dc Mon Sep 17 00:00:00 2001 From: XeonSquared Date: Tue, 8 Jan 2019 18:10:22 +1100 Subject: [PATCH] added a minitel stack, using the same code as the OpenOS version --- lib/event.lua | 42 +++++++ lib/minitel.lua | 167 +++++++++++++++++++++++++ service/minitel.lua | 298 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 507 insertions(+) create mode 100644 lib/event.lua create mode 100644 lib/minitel.lua create mode 100644 service/minitel.lua diff --git a/lib/event.lua b/lib/event.lua new file mode 100644 index 0000000..e9a90d5 --- /dev/null +++ b/lib/event.lua @@ -0,0 +1,42 @@ +local event = {} +function event.pull(t,...) + local tA = {...} + if type(t) == "string" then + table.insert(tA,1,t) + t = 0 + end + if not t or t <= 0 then + t = math.huge + end + local tE = computer.uptime()+t + repeat + tEv = {coroutine.yield()} + local ret = true + for i = 1, #tA do + if not (tEv[i] or ""):match(tA[i]) then + ret = false + end + end + if ret then return table.unpack(tEv) end + until computer.uptime() > tE + return nil +end + +function event.listen(e,f) + local op = os.getenv("parent") + os.setenv("parent",cPid) + os.spawn(function() while true do + local tEv = {coroutine.yield()} + if tEv[1] == e then + f(table.unpack(tEv)) + end + if not tTasks[os.getenv("parent")] or (tEv[1] == "unlisten" and tEv[2] == e and tEv[3] == tostring(f)) then break end + end end,string.format("[%d] %s listener",cPid,e)) + os.setenv("parent",op) +end + +function event.ignore(e,f) + computer.pushSignal("unlisten",e,tostring(f)) +end + +return event diff --git a/lib/minitel.lua b/lib/minitel.lua new file mode 100644 index 0000000..06dd271 --- /dev/null +++ b/lib/minitel.lua @@ -0,0 +1,167 @@ +local computer,event = computer,event +if _OSVERSION:sub(1,6) == "OpenOS" then + computer = require "computer" + event = require "event" +elseif _OSVERSION:sub(1,7) == "PsychOS" then + event = require "event" +end +local net = {} +net.mtu = 4096 +net.streamdelay = 30 +net.minport = 32768 +net.maxport = 65535 +net.openports = {} + +function net.genPacketID() + local npID = "" + for i = 1, 16 do + npID = npID .. string.char(math.random(32,126)) + end + return npID +end + +function net.usend(to,port,data,npID) + computer.pushSignal("net_send",0,to,port,data,npID) +end + +function net.rsend(to,port,data,block) + local pid, stime = net.genPacketID(), computer.uptime() + net.streamdelay + computer.pushSignal("net_send",1,to,port,data,pid) + if block then return false end + repeat + _,rpid = event.pull(0.5,"net_ack") + until rpid == pid or computer.uptime() > stime + if not rpid then return false end + return true +end + +-- ordered packet delivery, layer 4? + +function net.send(to,port,ldata) + local tdata = {} + if ldata:len() > net.mtu then + for i = 1, ldata:len(), net.mtu do + tdata[#tdata+1] = ldata:sub(1,net.mtu) + ldata = ldata:sub(net.mtu+1) + end + else + tdata = {ldata} + end + for k,v in ipairs(tdata) do + if not net.rsend(to,port,v) then return false end + end + return true +end + +-- socket stuff, layer 5? + +local function cwrite(self,data) + if self.state == "open" then + if not net.send(self.addr,self.port,data) then + self:close() + return false, "timed out" + end + end +end +local function cread(self,length) + length = length or "\n" + local rdata = "" + if type(length) == "number" then + rdata = self.rbuffer:sub(1,length) + self.rbuffer = self.rbuffer:sub(length+1) + return rdata + elseif type(length) == "string" then + if length:sub(1,2) == "*a" then + rdata = self.rbuffer + self.rbuffer = "" + return rdata + elseif length:len() == 1 then + local pre, post = self.rbuffer:match("(.-)"..length.."(.*)") + if pre and post then + self.rbuffer = post + return pre + end + return nil + end + end +end + +local function socket(addr,port,sclose) + local conn = {} + conn.addr,conn.port = addr,tonumber(port) + conn.rbuffer = "" + conn.write = cwrite + conn.read = cread + conn.state = "open" + conn.sclose = sclose + local function listener(_,f,p,d) + if f == conn.addr and p == conn.port then + if d == sclose then + conn:close() + else + conn.rbuffer = conn.rbuffer .. d + end + end + end + event.listen("net_msg",listener) + function conn.close(self) + event.ignore("net_msg",listener) + conn.state = "closed" + net.rsend(addr,port,sclose) + end + return conn +end + +function net.open(to,port) + if not net.rsend(to,port,"openstream") then return false, "no ack from host" end + local st = computer.uptime()+net.streamdelay + local est = false + while true do + _,from,rport,data = event.pull("net_msg") + if to == from and rport == port then + if tonumber(data) then + est = true + end + break + end + if st < computer.uptime() then + return nil, "timed out" + end + end + if not est then + return nil, "refused" + end + data = tonumber(data) + sclose = "" + repeat + _,from,nport,sclose = event.pull("net_msg") + until from == to and nport == data + return socket(to,data,sclose) +end + +function net.listen(port) + repeat + _, from, rport, data = event.pull("net_msg") + until rport == port and data == "openstream" + local nport = math.random(net.minport,net.maxport) + local sclose = net.genPacketID() + net.rsend(from,rport,tostring(nport)) + net.rsend(from,nport,sclose) + return socket(from,nport,sclose) +end + +function net.flisten(port,listener) + local function helper(_,from,rport,data) + if rport == port and data == "openstream" then + local nport = math.random(net.minport,net.maxport) + local sclose = net.genPacketID() + net.rsend(from,rport,tostring(nport)) + net.rsend(from,nport,sclose) + listener(socket(from,nport,sclose)) + end + end + event.listen("net_msg",helper) + return helper +end + +return net diff --git a/service/minitel.lua b/service/minitel.lua new file mode 100644 index 0000000..28d6af8 --- /dev/null +++ b/service/minitel.lua @@ -0,0 +1,298 @@ +--[[ +packet format: +packetID: random string to differentiate +packetType: + - 0: unreliable + - 1: reliable, requires ack + - 2: ack packet +destination: end destination hostname +sender: original sender of packet +data: the actual packet data, duh. +]]-- + +local listeners = {} +local modems = {} + +local cfg = {} +cfg.debug = false +cfg.port = 4096 +cfg.retry = 10 +cfg.retrycount = 64 +cfg.route = true + +local hostname = computer.address():sub(1,8) + +-- packet cache: [packet ID]=uptime +local pcache = {} +cfg.pctime = 30 + +--[[ +LKR format: +address { + local hardware address + remote hardware address + time last received +} +]]-- + +cfg.sroutes = {} +local rcache = setmetatable({},{__index=cfg.sroutes}) +cfg.rctime = 15 + +--[[ +packet queue format: +{ + packetID, + packetType + destination, + data, + timestamp, + attempts +} +]]-- +local pqueue = {} + +local function loadconfig() +end +local function saveconfig() +end + +-- specific OS support here +if _OSVERSION:sub(1,6) == "OpenOS" then -- OpenOS specific code + local timers = {} + + local event = require "event" + local component = require "component" + local computer = require "computer" + local serial = require "serialization" + local listener = false + + local function saveconfig() + local f = io.open("/etc/minitel.cfg","wb") + if f then + f:write(serial.serialize(cfg)) + f:close() + end + end + local function loadconfig() + local f=io.open("/etc/hostname","rb") + if f then + hostname = f:read() + f:close() + end + local f = io.open("/etc/minitel.cfg","rb") + if f then + local newcfg = serial.unserialize(f:read("*a")) + f:close() + for k,v in pairs(newcfg) do + cfg[k] = v + end + else + saveconfig() + end + end + function stop() + for k,v in pairs(listeners) do + event.ignore(k,v) + print("Stopped listener: "..tostring(v)) + end + for k,v in pairs(timers) do + event.cancel(v) + print("Stopped timer: "..tostring(v)) + end + end + + function set(k,v) + if type(cfg[k]) == "string" then + cfg[k] = v + elseif type(cfg[k]) == "number" then + cfg[k] = tonumber(v) + elseif type(cfg[k]) == "boolean" then + if v:lower():sub(1,1) == "t" then + cfg[k] = true + else + cfg[k] = false + end + end + print("cfg."..k.." = "..tostring(cfg[k])) + saveconfig() + end + + function set_route(to,laddr,raddr) + cfg.sroutes[to] = {laddr,raddr,0} + end + function del_route(to) + cfg.sroutes[to] = nil + end +end + +local function dprint(...) + if cfg.debug then + print(...) + end +end + + +function start() + loadconfig() + print("Hostname: "..hostname) + if listener then return end + modems={} + for a,t in component.list("modem") do + modems[#modems+1] = component.proxy(a) + end + for k,v in ipairs(modems) do + v.open(cfg.port) + print("Opened port "..cfg.port.." on "..v.address:sub(1,8)) + end + for a,t in component.list("tunnel") do + modems[#modems+1] = component.proxy(a) + end + + local function genPacketID() + local npID = "" + for i = 1, 16 do + npID = npID .. string.char(math.random(32,126)) + end + return npID + end + + local function sendPacket(packetID,packetType,dest,sender,vPort,data) + if rcache[dest] then + dprint("Cached", rcache[dest][1],"send",rcache[dest][2],cfg.port,packetID,packetType,dest,sender,vPort,data) + if component.type(rcache[dest][1]) == "modem" then + component.invoke(rcache[dest][1],"send",rcache[dest][2],cfg.port,packetID,packetType,dest,sender,vPort,data) + elseif component.type(rcache[dest][1]) == "tunnel" then + component.invoke(rcache[dest][1],"send",packetID,packetType,dest,sender,vPort,data) + end + else + dprint("Not cached", cfg.port,packetID,packetType,dest,sender,vPort,data) + for k,v in pairs(modems) do + if v.type == "modem" then + v.broadcast(cfg.port,packetID,packetType,dest,sender,vPort,data) + elseif v.type == "tunnel" then + v.send(packetID,packetType,dest,sender,vPort,data) + end + end + end + end + + local function pruneCache() + for k,v in pairs(rcache) do + dprint(k,v[3],computer.uptime()) + if v[3] < computer.uptime() then + rcache[k] = nil + dprint("pruned "..k.." from routing cache") + end + end + for k,v in pairs(pcache) do + if v < computer.uptime() then + pcache[k] = nil + dprint("pruned "..k.." from packet cache") + end + end + end + + local function checkPCache(packetID) + dprint(packetID) + for k,v in pairs(pcache) do + dprint(k) + if k == packetID then return true end + end + return false + end + + local function processPacket(_,localModem,from,pport,_,packetID,packetType,dest,sender,vPort,data) + pruneCache() + if pport == cfg.port or pport == 0 then -- for linked cards + dprint(cfg.port,vPort,packetType,dest) + if checkPCache(packetID) then return end + if dest == hostname then + if packetType == 1 then + sendPacket(genPacketID(),2,sender,hostname,vPort,packetID) + end + if packetType == 2 then + dprint("Dropping "..data.." from queue") + pqueue[data] = nil + computer.pushSignal("net_ack",data) + end + if packetType ~= 2 then + computer.pushSignal("net_msg",sender,vPort,data) + end + elseif dest:sub(1,1) == "~" then -- broadcasts start with ~ + computer.pushSignal("net_broadcast",sender,vPort,data) + elseif cfg.route then -- repeat packets if route is enabled + sendPacket(packetID,packetType,dest,sender,vPort,data) + end + if not rcache[sender] then -- add the sender to the rcache + dprint("rcache: "..sender..":", localModem,from,computer.uptime()) + rcache[sender] = {localModem,from,computer.uptime()+cfg.rctime} + end + if not pcache[packetID] then -- add the packet ID to the pcache + pcache[packetID] = computer.uptime()+cfg.pctime + end + end + end + + local function queuePacket(_,ptype,to,vPort,data,npID) + npID = npID or genPacketID() + if to == hostname or to == "localhost" then + computer.pushSignal("net_msg",to,vPort,data) + computer.pushSignal("net_ack",npID) + return + end + pqueue[npID] = {ptype,to,vPort,data,0,0} + dprint(npID,table.unpack(pqueue[npID])) + end + + + local function packetPusher() + for k,v in pairs(pqueue) do + if v[5] < computer.uptime() then + dprint(k,v[1],v[2],hostname,v[3],v[4]) + sendPacket(k,v[1],v[2],hostname,v[3],v[4]) + if v[1] ~= 1 or v[6] == cfg.retrycount then + pqueue[k] = nil + else + pqueue[k][5]=computer.uptime()+cfg.retry + pqueue[k][6]=pqueue[k][6]+1 + end + end + end + end + + listeners["modem_message"]=processPacket + listeners["net_send"]=queuePacket + if _OSVERSION:sub(1,6) == "OpenOS" then + event.listen("modem_message",processPacket) + print("Started packet listening daemon: "..tostring(processPacket)) + event.listen("net_send",queuePacket) + print("Started packet queueing daemon: "..tostring(queuePacket)) + timers[#timers+1]=event.timer(0,packetPusher,math.huge) + print("Started packet pusher: "..tostring(timers[#timers])) + end + if _OSVERSION:sub(1,8) == "KittenOS" then + neo.requireAccess("r.svc.minitel","minitel daemon")(function(pkg,pid,sendSig) + processes[pid] = sendSig + return {["sendPacket"]=queuePacket} + end) + end + + if _OSVERSION:sub(1,8) == "KittenOS" or _OSVERSION:sub(1,7) == "PsychOS" then + while true do + local ev = {coroutine.yield()} + packetPusher() + pruneCache() + if ev[1] == "k.procdie" then + processes[ev[3]] = nil + end + if listeners[ev[1]] then + pcall(listeners[ev[1]],table.unpack(ev)) + end + end + end +end + +if _OSVERSION:sub(1,6) ~= "OpenOS" then + start() +end