From a8ad2470dba504baad8e855a0d6c9e107e7e5dc6 Mon Sep 17 00:00:00 2001 From: kousu Date: Wed, 3 Sep 2014 20:28:21 -0400 Subject: [PATCH 01/41] Clean up replicate_server. Now it is restartable without having to use pkill or dig through your system monitor. i should have done this immediately. --- scratch/psql/replicate_server.sh | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/scratch/psql/replicate_server.sh b/scratch/psql/replicate_server.sh index 17f1d41..e0cf71f 100755 --- a/scratch/psql/replicate_server.sh +++ b/scratch/psql/replicate_server.sh @@ -21,10 +21,20 @@ SERVER="./replicate.sh ${TABLE}" # TODO: name this better / use mkstemp / something IPC=/tmp/s_websockify_replicate_$table + +# terminate all children (websockify, etc) on exit +# tip from http://stackoverflow.com/questions/360201/kill-background-process-when-shell-script-exit +#trap 'kill $(jobs -p)' EXIT #?? doesn't work + + # start up websockify, daemonized, waiting for connections to proxy to socat -websockify -D $PORT --unix-target=$IPC +websockify $PORT --unix-target=$IPC & #NB: it is important; it might be more robust to just give up on bash and use subprocess.py instead... +WEBSOCKIFY=$! # start up socat, proxying TCP to the replication server # reusaddr gets around lingering (e.g. TIME_WAIT) connections blocking the new bind(), # so that you can stop and restart this script immediately socat UNIX-LISTEN:$IPC,fork,reuseaddr EXEC:"$SERVER" +# we don't background socat because we need something to hold this script open + +kill $WEBSOCKIFY From 3c78c0c3743981d3d27a048a50e83998631b3679 Mon Sep 17 00:00:00 2001 From: kousu Date: Wed, 3 Sep 2014 21:06:51 -0400 Subject: [PATCH 02/41] Debugging why unregister() doesn't get called when clients leave. forkit.py demonstrates that socat does indeed shut down EXEC processes when the socket dies (tested with both TCP and Unix Domain). So something about replicate.py is holding it open idenfinitely. In fact, replicate.py can outlive socat. I could rewrite replicate.py as a socket app itself, and in it select() on both the input socket and the output socket, but that's a lot of work and it might not even be the proper bug. --- scratch/psql/experiments/forkit.py | 44 ++++++++++++++++++++++++++++++ scratch/psql/replicate.py | 18 ++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 scratch/psql/experiments/forkit.py diff --git a/scratch/psql/experiments/forkit.py b/scratch/psql/experiments/forkit.py new file mode 100644 index 0000000..989c336 --- /dev/null +++ b/scratch/psql/experiments/forkit.py @@ -0,0 +1,44 @@ + +# socat with the "fork" option holds open programs even after they've died +# maybe I need to check is sys.stdout exists???? + +import sys +import uuid + +import threading, time +import atexit + +I = uuid.uuid4() +i = 0 + +def alive(): + global i + while True: + #a = input() + #print(a*2) + sys.stderr.write(str(I)+":"+str(i)) + sys.stderr.write("\n") + sys.stderr.flush() + time.sleep(2) + i += 1 + +T = threading.Thread(target=alive); +T.start() + +def q(): + print("quittign") + sys.stderr.write("stderr::quitting\n"); sys.stderr.flush() +atexit.register(q) + +# used with socat's EXEC address, e.g. +# socat UNIX-LISTEN:/tmp/sw,fork,reuseaddr EXEC:"python forkit.py" +# socat TCP-LISTEN:7777,fork,reuseaddr EXEC:"python forkit.py" +# and then connected to +# this is indeed shutdown properly by socat when the client disconnects. +# However, for some reason replicate.py doesn't shutdown. +# + +while True: + time.sleep(1) + + diff --git a/scratch/psql/replicate.py b/scratch/psql/replicate.py index 3d87db5..4e3b5d9 100755 --- a/scratch/psql/replicate.py +++ b/scratch/psql/replicate.py @@ -99,6 +99,8 @@ def __enter__(self): return self #oops! def __exit__(self, *args): + print("__exit__") + # unregister ourselves # XXX SQL injection here C = E.connect() @@ -183,15 +185,27 @@ def replicate(_table): yield delta # NOTREACHED (unless something crashes, the changes feed should be infinite, and a crash would crash before this line anyway) - +import threading + +def alivethread(): + import time + while True: + sys.stderr.write("what's up?") + sys.stderr.flush() + time.sleep(3) + if __name__ == '__main__': import sys table = sys.argv[1] + T = threading.Thread(target=alivethread) + T.start() + for delta in replicate(table): #print(delta, flush=True) #py3 - print delta; sys.stdout.flush() #py2 + print (delta); sys.stdout.flush() #py2/3 + sys.stderr.write("hello \n") # NOTREACHED From d0aed5701e9ff318ed04b8653abe940abd3c3614 Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 00:04:24 -0400 Subject: [PATCH 03/41] Salvage socat's habit of leaving replicate.py's around. Figuring out how to get around this took a long time, and my solution is not as elegant as I would like. It came down to miusing a socketpair() like a Unix signal or a because our spoolers block at select() already, but I wanted to have them stop blocking until input OR a "signal" comes, so I needed to use something that had a file descriptor behind it. I also now spin off subsidary threads simply because I wanted to block on two totally separate file descriptors in parallel and I could either rewrite my select() loops to be one single loop--which breaks encapsulation and just feels messy--or do this, which also feels messy but at least maintains encapsulation. --- scratch/psql/experiments/forkit.py | 27 ++++++++++++ scratch/psql/replicate.py | 70 ++++++++++++++++++++++-------- 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/scratch/psql/experiments/forkit.py b/scratch/psql/experiments/forkit.py index 989c336..f5e7cf4 100644 --- a/scratch/psql/experiments/forkit.py +++ b/scratch/psql/experiments/forkit.py @@ -37,6 +37,33 @@ def q(): # this is indeed shutdown properly by socat when the client disconnects. # However, for some reason replicate.py doesn't shutdown. # +# I know what's the problem: +# this is being hard-killed (-9'd, or something), and so the process simply dies without having a change to clean up after itself. +# The way I've written the code, it only calls Changes.__exit__() (--> so therefore calls unregister()) +# if an exception happens *within* replicate--and not just that, +# but within the thread doing the spooling. +# +# ...so how do I fix this? +# I could trap the exit signal maybe, but that seems like a patchwork solution and won't run on the right thread +# I could have a monitor thread watching (via select(), even) sys.stdin or sys.stdout and looking for EOF --- I've got this written and it reliably catches the quit--and then raise a signal to trigger the main loop to quit +# ^ this is flakey, because there's no guarantee this thread will run between the socket closing and the kill coming +# I could restructure the code so that the spooling loop select()s on *both* the input (unix domain socket from inside of postgres) and output (stdout) streams. Then, when +# WORKING BUT AWKWARD SOLUTION: set end-close on the EXEC in socat; this makes socat forgo the; then, do the restructuring to watch for stdin falling over. +# CURRENTLY AWKWARD because I've written +# for delta in replicate(table): +# if select.select([sys.stdin], [], [],0)[0]: +# so replicants don't find out they should die until the next time a delta comes from the database. This isn't the end of the world, but it certainly provides a way (for an attacker?) to chew up resources if we also allow clients to indirectly control writes to the DB: open and close the client page a million times; this will spawn a million replicates which won't ever close +# I think the only other ways to detect the client dying are +# - rewrite as a socket app and check explicitly +# - use signal +# the monitoring thread is essentially no better than restructuring the code; the code will *still* have to be +# # Oh! If it is important that we quit, then make the main thread the one monitoring stdin +# and put +# hm but really +# ..oh, except.. well... we're using datagrams on the input side so we don't have... + +# This is additionally made extra complicated by replicate.sh, since processes do not take their children with them by default +# --> it would be better if replicate.sh could be avoided -- all it does is set an environment var, and then only on OS X while True: time.sleep(1) diff --git a/scratch/psql/replicate.py b/scratch/psql/replicate.py index 4e3b5d9..a406030 100755 --- a/scratch/psql/replicate.py +++ b/scratch/psql/replicate.py @@ -73,9 +73,10 @@ class Changes: MTU = 2048 #maximum bytes to read per message - def __init__(self, table): #TODO: support where clauses + def __init__(self, table, ctl_sock): #TODO: support where clauses self._table = table self._stream_id = None + self._ctl_sock = ctl_sock def __enter__(self): # set up our listening socket @@ -99,7 +100,6 @@ def __enter__(self): return self #oops! def __exit__(self, *args): - print("__exit__") # unregister ourselves # XXX SQL injection here @@ -117,10 +117,13 @@ def __iter__(self): return self def __next__(self): - fread, fwrite, ferr = select.select([self._sock], [], [self._sock]) #block until some data is available + fread, fwrite, ferr = select.select([self._sock, self._ctl_sock], [], [self._sock, self._ctl_sock]) #block until some data is available if ferr: pass #XXX else: + if self._ctl_sock in fread: + if not self._ctl_sock.recv(1): + raise StopIteration pkt = self._sock.recv(Changes.MTU) pkt = pkt.decode('utf-8') return pkt @@ -128,8 +131,11 @@ def __next__(self): next = __next__ #py2 compatibility -def replicate(_table): +def replicate(_table, ctl_sock): + """ + ctl_sock should be a socket that + """ # XXX we might need to construct the Changes stream first # If we do have concurrency problems, doing that at least guarantees that we don't miss any changes, though we might end up with duplicate rows or trying to delete nonexistent rows @@ -145,7 +151,7 @@ def replicate(_table): # this is sort of tricky # I need to say somethign like - with Changes(_table) as changes: #<-- use with to get the benefits of RAII, since Changes has a listening endpoint to worry about cleaning up + with Changes(_table, ctl_sock) as changes: #<-- use with to get the benefits of RAII, since Changes has a listening endpoint to worry about cleaning up # README: BUGFIX: the change to raw_connection() caused a deadlock which only occurs the first time register() is called: register() needs to create a trigger on _table, but cur holds a lock on _table # it seems, however, that reordering the instructions avoids the deadlock @@ -169,6 +175,7 @@ def replicate(_table): keys = [col.name for col in cur.description] #low level SQLAlchemy (psycopg2, in this case) #keys = cur.keys() #SQLAlchemy for row in cur: + if select.select([ctl_sock], [], [],0)[0]: break #note: the ",0" is key here! it means "do polling" instead of "do blocking" row = dict(zip(keys, row)) #coerce the SQLAlchemy row format to a dictionary delta = {"+": row} #convert row to our made up delta format; the existing rows can all be considered inserts delta = json.dumps(delta) #and then to JSON @@ -184,28 +191,53 @@ def replicate(_table): # we assume that the source (watch_table()) has already jsonified things for us; THIS MIGHT BE A MISTAKE yield delta # NOTREACHED (unless something crashes, the changes feed should be infinite, and a crash would crash before this line anyway) - + + import threading -def alivethread(): - import time - while True: - sys.stderr.write("what's up?") - sys.stderr.flush() - time.sleep(3) +# have one thread, the alive thread, watching for (this can just +# and the Repl +def replicatethread(table, ctl_sock): + ctl, ctl_slave = ctl_sock + try: + for delta in replicate(table, ctl_slave): + #print(delta, flush=True) #py3 + print (delta); sys.stdout.flush() #py2/3 + finally: + ctl.close() # NB: unix doesn't care if a socket is closed multiple times + +# TODO: use the signal module to trap things that might eat us and rewrite them as "ctl.close()" so that cleanup can happen properly if __name__ == '__main__': import sys table = sys.argv[1] - T = threading.Thread(target=alivethread) - T.start() + ctl, ctl_slave = socket.socketpair() #when this socket closes all spools (all two of them, but it could be generalized) should shut down immediately + + RT = threading.Thread(target=replicatethread, args=(table, (ctl, ctl_slave))) + RT.start() - for delta in replicate(table): - #print(delta, flush=True) #py3 - print (delta); sys.stdout.flush() #py2/3 - sys.stderr.write("hello \n") + # TODO: for symmetry, there should be an "AT" (alivethread) which does the stdin checking + # and main() should simply .join() all the threads, and assume that they'll inter-signal each other and shutdown cleanly + # XXX speaking of signals, what if I used them? It would be more unixey, in one sense. I would probably still need a socketpair() call in order to be able to map a signal into something I can select() on - # NOTREACHED + try: + while True: + fread, fwrite, ferr = select.select([sys.stdin, ctl_slave], [], []) + #print("alivethread:", (fread, fwrite, ferr), file=sys.stderr) + if sys.stdin in fread: + #print("alivethread:", "break stdin",file=sys.stderr) + if sys.stdin.read(1) == "": + break + if ctl_slave in fread: + #print("alivethread:", "break ctl",file=sys.stderr) + if ctl_slave.recv(1) == "": + break + #if sys.stdout in ferr: + # print("alivethread:", "break stdout",file=sys.stderr) + # break + finally: + ctl.close() + RT.join() #give the replication a chanc eto clean up after itself \ No newline at end of file From 26da4934276b5df3a1696f03e4001f993d813290 Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 00:32:08 -0400 Subject: [PATCH 04/41] Minor SimulationLog safety checks --- src/models/eutopia/simulationlog.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/models/eutopia/simulationlog.py b/src/models/eutopia/simulationlog.py index baa99f7..ba5364d 100644 --- a/src/models/eutopia/simulationlog.py +++ b/src/models/eutopia/simulationlog.py @@ -75,6 +75,9 @@ def __new__(cls, parent, name, *schema, **kwargs): # works on sets and is ignorant of order, but sqlalchemy isn't, and csv isn't. assert isinstance(parent, SimulationLog), "SimulationTable only works with SimulationLogs." + if any(isinstance(c, Column) and c.name == "run_id" for c in schema): + raise ValueError("run_id is a reserved column name in SimulationTables") + schema = (Column("run_id", Integer, ForeignKey("runs.id"), primary_key=True, default=parent.run_id),) + schema #the default here is a constant and *private* # TODO: use ForeignKey(parent.runs_table.c.id) instead of a string, for stronger typing win @@ -242,6 +245,10 @@ def __new__(cls, parent, name, *schema, **kwargs): # prefix the table by 'time'; note that the time is pulled, via closure, from the parent ModelLog object #assert isinstance(parent, TimestepLog), "TimestepTable only works with TimestepLogs." assert hasattr(parent, 'time'), "TimestepTable only works with TimestepLogs." #looser, duck-typed precondition + + if any(isinstance(c, Column) and c.name == "time" for c in schema): + raise ValueError("time is a reserved column name in TimestepTables") + schema = (Column("time", Integer, ForeignKey("timesteps.time"), primary_key=True, default=lambda: parent.time), ) + schema From d0582f56954d795f6bfd6029f1e2bb18118e8726 Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 01:14:46 -0400 Subject: [PATCH 05/41] minor clarity --- scratch/psql/replicate.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scratch/psql/replicate.py b/scratch/psql/replicate.py index a406030..674ba3f 100755 --- a/scratch/psql/replicate.py +++ b/scratch/psql/replicate.py @@ -175,7 +175,11 @@ def replicate(_table, ctl_sock): keys = [col.name for col in cur.description] #low level SQLAlchemy (psycopg2, in this case) #keys = cur.keys() #SQLAlchemy for row in cur: - if select.select([ctl_sock], [], [],0)[0]: break #note: the ",0" is key here! it means "do polling" instead of "do blocking" + # fail-fast if we've been told to shutdown + if select.select([ctl_sock], [], [],0)[0]: # the ",0" is key here! it means "do polling" instead of "do blocking" + break + + # spool the next row row = dict(zip(keys, row)) #coerce the SQLAlchemy row format to a dictionary delta = {"+": row} #convert row to our made up delta format; the existing rows can all be considered inserts delta = json.dumps(delta) #and then to JSON From ff01d36543b6f8b34235c5eb6b308742e6481a2e Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 01:15:02 -0400 Subject: [PATCH 06/41] clean up the postgres startup scripts somewhat; 'init.sh' installs the replication hooks for you So now, installing this system should look something like Get the software $ pkg_add postgres python-psycopg2 python-sqlalchemy $ git clone modex $ cd modex/ Set up the DB $ cd src/backend/db $ ./init.sh eutopia #or other model of your choice $ ./server.sh in a different session, set up the simulation $ cd src/backend/models/eutopia $ ./eutopia postgresql://localhost/eutopia #but you can use "sqlite:///eutopia.sqlite" if you are having problems with postgres in a third and fpurth and fifth session, set up the web clients $ python -m SimpleHTTPServer $ cd src/backend/db; ./replicate_server 8081 postgresql://localhost/eutopia activity_counts #<-- this part is still dodgy $ browser http://localhost:8000/src/frontend/demo.html AND IF THIS DOESN'T WORK AS PROMISED IT'S A BUG --- scratch/psql/client.sh | 21 +++++++++++++-------- scratch/psql/init.sh | 40 ++++++++++++++++++++++++++++++++++++++++ scratch/psql/server.sh | 6 +++++- 3 files changed, 58 insertions(+), 9 deletions(-) create mode 100755 scratch/psql/init.sh diff --git a/scratch/psql/client.sh b/scratch/psql/client.sh index 15d30ce..d1973a2 100755 --- a/scratch/psql/client.sh +++ b/scratch/psql/client.sh @@ -1,13 +1,18 @@ #!/usr/bin/env bash - +# usage: ./client.sh [dbname] +# launches psql pointed at the db +# server.sh must be running for this to work pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null cd $HERE -psql -d postgres -h 127.0.0.1 -# -h "$HERE" - # initdb makes a default database called "postgres" so we just ride on that. - # -h makes postgres use the current directory look for its socket in $HERE - # ...except for some reason "postgres -k ." doesn't seem to actually write the socket file into $HERE, - # but it stops postgres from trying to write to /var/run/postgresql - #so I fall back on TCP, as usual. Which is fine, because that makes sniffing the traffic easier. +DB=$1 +if [ -z $DB ]; then + DB="postgres" # initdb makes a default database called "postgres" so we just ride on that as the default +fi + +PGDATA=`pwd`/data + +psql -d $DB -h $PGDATA #<-- this construction makes -h start with a slash which makes postgres interpret it as a path instead of an address which means psql connects over unix domain instead of tcp + + diff --git a/scratch/psql/init.sh b/scratch/psql/init.sh new file mode 100755 index 0000000..1536e79 --- /dev/null +++ b/scratch/psql/init.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# usage: init.sh dbname +# constructs a postgres instance, the database "dbname", and installs the replication hooks +# once you run this, run server.sh to use the database + +pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null +cd $HERE + +if [ `uname` = "Darwin" ]; then + # OS X has several possible python distros + # our supported configuration uses Postgres.app, + # which is built linked against the Apple distro of python + # so we need to ensure the Apple python is the one loaded + # or else strange library errors will crop up + # + # Unfortunately, I'm having trouble building psycopg2 against the system python, so we clients (ie replicate.py) has to be run with Anaconda Python + + export PATH=/usr/bin:$PATH +fi + + +DB=$1 + +export PGDATA=$HERE/data +if [ ! -d $PGDATA ]; then + initdb +fi + +pg_ctl -w start -o "-k ." && + +#<-- this construction makes psql connect over unix domain instead of tcp, since PGDATA is path, not a hostname +psql -d postgres -h $PGDATA </dev/null; HERE=`pwd`; popd >/dev/null cd $HERE -postgres -D ./data/ -k . +#"-k ." is the magic that means "put your socket file in your current directory" and not in /var/run which you would need to fight permissions on +postgres -D ./data/ -k . From b4a1a06f6772c8656f3b08440314e029dbd66250 Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:06:46 -0400 Subject: [PATCH 07/41] Switch to proper js inheritence: Constructor.call(this, ...) + merging prototypes as per http://stackoverflow.com/questions/4152931/javascript-inheritance-call-super-constructor-or-use-prototype-chain --- src/frontend/cacheset.js | 134 +++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 69 deletions(-) diff --git a/src/frontend/cacheset.js b/src/frontend/cacheset.js index bc462e8..7f6d02d 100644 --- a/src/frontend/cacheset.js +++ b/src/frontend/cacheset.js @@ -26,6 +26,15 @@ * TODO: clean up terminology; decide between parents,children and dependees,dependents */ + +/* Vision for partial server-side querying + * if our query ops are compatible with SQL (or at least, if some of them are) + * then perhaps we could do something like just quoting the syntax we use to do querying in js and sending that to the server + * or do the AST-object idea where we have a parallel tree of RCacheSet objects (R for "Remote") + * + * the best would be to have a situation where we can switch between RCacheSet and CacheSet objects just by renaming, and mix and match at will, + * easy enough that we can do performance testing--perhaps even hotspot performance testing--to figure out which queries should be server side and which should be client side + */ // how do I extend the array type? @@ -61,35 +70,6 @@ function is(a,b) { // maybe we should write it that .delete() is special cased to use isEqual } -/* helper method which many of the things share - * NB: .findIndex is a gecko extension, which we've imported by a polyfill in libs - * this - */ -function findIndex(self, e) { - return self._cache.findIndex(function(g) { return is(g,e); }); -} - -function add(self, e) { - self._cache.push(e); - //TODO: keep sorted - - self.trigger("insert", e); - self.trigger("rerender"); -} - -function remove(self, e) { - // find an element equal to e; note: there might be more than one! - i = findIndex(self, e); - //i = self._cache.indexOf(e); - //i = self._cache.findIndex(function(g) { return _.isEqual(g, e); }) - //TODO: once the caches are kept sorted, use a binary search instead - - if(i >= 0) { //silly bug: "if(i)" is false if i==0, but i==0 is a valid result from indexOf - self._cache.splice(i, 1); - self.trigger("delete", e); - self.trigger("rerender"); - } -} @@ -126,12 +106,8 @@ function remove(self, e) { * TODO: implement iterators or element access or something (rather than telling clients to just use ._cache); or, subclass Array and use its built in indexers */ function CacheSet(seed) { - // if 'new' was not used - if(! (this instanceof CacheSet)) return new CacheSet(seed); - if(seed === null) { seed = new Array(); } this._cache = seed.slice(0); //shallow copy the seed array - } _.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source // alternately, roll these ideas into PourOver, though that will be difficult without breaking PourOver, or at least its zen. @@ -139,17 +115,40 @@ _.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events inst CacheSet.prototype.insert = function(e) { - add(this, e); + this._cache.push(e); + //TODO: keep sorted + + this.trigger("insert", e); + this.trigger("rerender"); } CacheSet.prototype.delete = function(e) { - remove(this, e); + // find an element equal to e; note: there might be more than one! + i = this.findIndex(this, e); + //i = this._cache.indexOf(e); + //i = this._cache.findIndex(function(g) { return _.isEqual(g, e); }) + //TODO: once the caches are kept sorted, use a binary search instead + + if(i >= 0) { //silly bug: "if(i)" is false if i==0, but i==0 is a valid result from indexOf + this._cache.splice(i, 1); + this.trigger("delete", e); + this.trigger("rerender"); + } } - CacheSet.prototype.subset = function(pred) { return new SubSet(this, pred); } + + +/* helper method which many of the things share + * NB: .findIndex is a gecko extension, which we've imported by a polyfill in libs + * this + */ +CacheSet.prototype.findIndex = function(e) { + return this._cache.findIndex(function(g) { return is(g,e); }); +} + /* A SubSet is a filtered slice of a CacheSet * and is itself considered a CacheSet * Like a SQL View, a SubSet @@ -160,26 +159,29 @@ CacheSet.prototype.subset = function(pred) { // \forall `e`: `e in P` and `pred(e)` => `e in S` */ function SubSet(parent, pred) { - - // if 'new' was not used - if(! (this instanceof SubSet)) return new SubSet(parent, pred); + var self = this; //XXX this would be more elegant if it was a subclass of CacheSet, // but I don't know how to do js inheritance properly // also, it's not clear if a SubSet should allow direct .insert() and .delete()s // TODO: implement .subset(), at least - var self = this; + + CacheSet.call(this, parent._cache.filter(pred)) //call the super constructor + + // my goal is to avoid having to redefine the methods like insert() and delete() which should come for free + // in particular, I am *not* rewriting the conviencence methods .and(), .or(), etc; I'm putting them in one place, at the top + // I also want to run some but maybe not all of the initialization code from the + self._pred = pred; - self._cache = parent._cache.filter(self._pred); // whoops; .on() runs its callback in the scope of parent. // bah. dynamic scopppinggggg!!! parent.on("insert", function(e) { - if(self._pred(e)) { - add(self, e); + if(self._pred(e)) { //careful: this is 'parent' in here, not 'self' + this.insert(self, e); } }); @@ -193,21 +195,16 @@ function SubSet(parent, pred) { // e *is* in self if(self._pred(e)) { - remove(self, e); + this.delete(self, e); } }); } -SubSet.prototype.subset = function(pred) { - return new SubSet(this, pred); -} -_.extend(SubSet.prototype, PourOver.Events); - +_.extend(SubSet.prototype, CacheSet.prototype); +// the prototype needs to be cloned from the parent? // TODO: write a o_cmp which can be used by Array.sort() to order objects; by default, sort() misbehaves on objects function And(A, B) { - // if 'new' was not used - if(! (this instanceof And)) return new And(A, B); var self = this; self._A = A; @@ -216,9 +213,9 @@ function And(A, B) { // Compute (A and B) as [e for e in A if e in B] //TODO: exploit sorting to make this faster // as written, this is an O(n^2) step - self._cache = self._A._cache.filter(function(a) { - return self._B._cache.indexOf(a) != -1; - }); + CacheSet.call(this, A._cache.filter(function(a) { + return B._cache.indexOf(a) != -1; + })); // now, an incoming item can only come in if it is BOTH in A and in B // that means we cannot add(self, e) until we have heard A.on("insert", e) and B.on("insert", e) @@ -239,7 +236,7 @@ function And(A, B) { // 1) check if e is in B's limbo if((i = B_limbo.indexOf(e)) != -1) { B_limbo.splice(i, 1); - add(self, e); + self.insert(self, e); } else { // 2) we "haven't" seen e yet; queue it A_limbo.push(e); //TODO: keep sortttted @@ -258,7 +255,7 @@ function And(A, B) { } else { // 2) we "haven't" seen e yet; queue it A_limbo.push(e); //TODO: keep sortttted - remove(self, e); + self.delete(self, e); } }); @@ -267,7 +264,7 @@ function And(A, B) { // 1) check if e is in B's limbo if((i = A_limbo.indexOf(e)) != -1) { A_limbo.splice(i, 1); - add(self, e); + self.insert(self, e); } else { // 2) we "haven't" seen e yet; queue it B_limbo.push(e); //TODO: keep sortttted @@ -286,18 +283,19 @@ function And(A, B) { } else { // 2) we "haven't" seen e yet; queue it B_limbo.push(e); //TODO: keep sortttted - remove(self, e); + self.delete(self, e); } }); } -_.extend(And.prototype, PourOver.Events); +_.extend(And.prototype, CacheSet.prototype); // I really need some sort of SortedArray type which has, like, merge() and filter() ops function Or(A, B) { - if(! (this instanceof Or)) return new Or(A, B); var self = this; + CacheSet.call(this, A._cache.concat(B._cache)); + self._A = A; self._B = B; @@ -307,7 +305,7 @@ function Or(A, B) { // in fact, it's *even worse* here than in And(), since not only is there the n^2 And step, then there's a tedious n^2 filtering out step // this pains me so much - self._cache = self._A._cache.concat(self._B._cache); + intersection = self._A._cache.filter(function(a) { return self._B._cache.indexOf(a) != -1; }); @@ -321,7 +319,7 @@ function Or(A, B) { } } -_.extend(Or.prototype, PourOver.Events); +_.extend(Or.prototype, CacheSet.prototype); //TODO: maybe rename "SubSet" "Where" @@ -340,19 +338,19 @@ function select(o, fields) { function Map(parent, m) { var self = this; - self._cache = parent._cache.map(m); + CacheSet.call(this, parent._cache.map(m)); parent.on("insert", function(e) { e = m(e); - add(self, e); + self.insert(self, e); //warning! 'this' is not 'self' within these event handlers!! }); parent.on("delete", function(e) { // as in "SubSet", we have an invariant that implies that if we actually see a delete we know necessarily we will perform a delete e = m(e); - remove(self, e); + self.delete(self, e); }); } -_.extend(Map.prototype, PourOver.Events); +_.extend(Map.prototype, CacheSet.prototype); /* Columns: select only the given fields @@ -393,16 +391,14 @@ function setstr(s) { function test_cacheset() { console.info("CACHESET TESTS"); -console.log(P); +window.P = P; //DEBUG var norse_monsters = P.subset(function(m) { return m.mythology == "norse" }); var sitting_monsters = P.subset(function(m) { return _(m.hobbies).contains("sitting") }); - console.log("norse_monsters = ", setstr(norse_monsters)); console.log("sitting_monsters = ", setstr(sitting_monsters)); - var names = Scalar(P, "name"); console.log("scalar[names] = ", names._cache); From 31e466e2b03c10005f7f8dcb38fd7544bcc49f9f Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:23:37 -0400 Subject: [PATCH 08/41] Add CacheSet sugar: convenience operators as methods Now that I've figured out JS inheritence, doing this was reasonable. --- src/frontend/cacheset.js | 68 +++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/src/frontend/cacheset.js b/src/frontend/cacheset.js index 7f6d02d..5b2a05f 100644 --- a/src/frontend/cacheset.js +++ b/src/frontend/cacheset.js @@ -109,20 +109,18 @@ function CacheSet(seed) { if(seed === null) { seed = new Array(); } this._cache = seed.slice(0); //shallow copy the seed array } -_.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source -// alternately, roll these ideas into PourOver, though that will be difficult without breaking PourOver, or at least its zen. - - -CacheSet.prototype.insert = function(e) { +// core CacheSet API +_.extend(CacheSet.prototype, { +insert: function(e) { this._cache.push(e); //TODO: keep sorted this.trigger("insert", e); this.trigger("rerender"); -} +}, -CacheSet.prototype.delete = function(e) { +delete: function(e) { // find an element equal to e; note: there might be more than one! i = this.findIndex(this, e); //i = this._cache.indexOf(e); @@ -135,9 +133,28 @@ CacheSet.prototype.delete = function(e) { this.trigger("rerender"); } } -CacheSet.prototype.subset = function(pred) { - return new SubSet(this, pred); -} +}) + +//operators +_.extend(CacheSet.prototype, { +// filtering operators +and: function(B) { return new And(this, B); }, +or: function(B) { return new Or(this, B); }, +subset: function(pred) { return new SubSet(this, pred); }, +distinct: function() { return new Distinct(this); }, + +// map operators +map: function(m) { return new Map(this, m); }, +select: function(fields) { return Select(this, fields); /*CAREFUL: 'new' is WRONG here*/ }, +scalar: function(field) { return Scalar(this, field); /*CAREFUL: 'new' is WRONG here*/ }, + +// reduce operators +// TODO + +}) + +_.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source +// alternately, roll these ideas into PourOver, though that will be difficult without breaking PourOver, or at least its zen. @@ -236,7 +253,7 @@ function And(A, B) { // 1) check if e is in B's limbo if((i = B_limbo.indexOf(e)) != -1) { B_limbo.splice(i, 1); - self.insert(self, e); + self.insert(e); } else { // 2) we "haven't" seen e yet; queue it A_limbo.push(e); //TODO: keep sortttted @@ -255,7 +272,7 @@ function And(A, B) { } else { // 2) we "haven't" seen e yet; queue it A_limbo.push(e); //TODO: keep sortttted - self.delete(self, e); + self.delete(e); } }); @@ -264,7 +281,7 @@ function And(A, B) { // 1) check if e is in B's limbo if((i = A_limbo.indexOf(e)) != -1) { A_limbo.splice(i, 1); - self.insert(self, e); + self.insert(e); } else { // 2) we "haven't" seen e yet; queue it B_limbo.push(e); //TODO: keep sortttted @@ -283,7 +300,7 @@ function And(A, B) { } else { // 2) we "haven't" seen e yet; queue it B_limbo.push(e); //TODO: keep sortttted - self.delete(self, e); + self.delete(e); } }); } @@ -342,12 +359,12 @@ function Map(parent, m) { parent.on("insert", function(e) { e = m(e); - self.insert(self, e); //warning! 'this' is not 'self' within these event handlers!! + self.insert(e); //warning! 'this' is not 'self' within these event handlers!! }); parent.on("delete", function(e) { // as in "SubSet", we have an invariant that implies that if we actually see a delete we know necessarily we will perform a delete e = m(e); - self.delete(self, e); + self.delete(e); }); } _.extend(Map.prototype, CacheSet.prototype); @@ -384,7 +401,8 @@ var monsters = [{name: "sphinx", mythology: "greek", eyes: 2, sex: "f", hobbies: var P = new CacheSet(monsters); function setstr(s) { - return Scalar(s, "name")._cache + //return Scalar(s, "name")._cache + return _(s._cache).pluck("name") } @@ -399,24 +417,30 @@ var sitting_monsters = P.subset(function(m) { return _(m.hobbies).contains("sitt console.log("norse_monsters = ", setstr(norse_monsters)); console.log("sitting_monsters = ", setstr(sitting_monsters)); -var names = Scalar(P, "name"); -console.log("scalar[names] = ", names._cache); +qq = P._cache.map(function(e) { + return e["name"]; + }); +console.info(qq) +var names = P.scalar("name"); +console.log("scalar[names] = ", JSON.stringify(names._cache)); -var norse_and_sitting = new And(norse_monsters, sitting_monsters); +var norse_and_sitting = norse_monsters.and(sitting_monsters); console.log("norse AND sitting = ", setstr(norse_and_sitting)); -var norse_or_sitting = new Or(norse_monsters, sitting_monsters); +var norse_or_sitting = norse_monsters.or(sitting_monsters); console.log("norse OR sitting = ", setstr(norse_or_sitting)); console.info(""); -var norse_view = Select(norse_monsters, ["name", "mythology"]); +var norse_view = norse_monsters.select(["name", "mythology"]); var n = {name: "ogabooga", mythology: "norse", eyes: 17, sex: "t", hobbies: ["staring","scaring","sitting","crying"]}; +console.log("scalar[names] = ", JSON.stringify(names._cache)); console.log("adding", n); P.insert(n); +console.log("scalar[names] = ", JSON.stringify(names._cache)); console.log("norse_monsters = ", setstr(norse_monsters)); console.log("norse_view = ", JSON.stringify(norse_view._cache)); console.log("sitting_monsters = ", setstr(sitting_monsters)); From 9c2c9a4bfcfaab86acd6302c2bc0ea8159e3a012 Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:31:24 -0400 Subject: [PATCH 09/41] Switch CacheSet -> Table and SubSet -> Where to emulate SQL also a small nicety fix for init.sh and make HDRJCacheSet (cum Table) inherit the same as the other classes. --- scratch/psql/init.sh | 6 +++ src/frontend/cacheset.js | 91 ++++++++++++++++++-------------------- src/frontend/setgraph.html | 31 ++++++------- 3 files changed, 66 insertions(+), 62 deletions(-) diff --git a/scratch/psql/init.sh b/scratch/psql/init.sh index 1536e79..315f11e 100755 --- a/scratch/psql/init.sh +++ b/scratch/psql/init.sh @@ -3,6 +3,8 @@ # constructs a postgres instance, the database "dbname", and installs the replication hooks # once you run this, run server.sh to use the database + + pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null cd $HERE @@ -20,6 +22,9 @@ fi DB=$1 +if [ -z $DB ]; then + DB="postgres" +fi export PGDATA=$HERE/data if [ ! -d $PGDATA ]; then @@ -34,6 +39,7 @@ CREATE DATABASE $DB; EOF # install the replication hook +# TODO: do we want to do this installation in replicate.py instead? psql -d $DB -h $PGDATA < ./replicate.pysql && # shutdown diff --git a/src/frontend/cacheset.js b/src/frontend/cacheset.js index 5b2a05f..6d5b3d6 100644 --- a/src/frontend/cacheset.js +++ b/src/frontend/cacheset.js @@ -1,10 +1,10 @@ -/* cacheset: an in-memory in-javascript database system based around multisets instead of around tables or documents (though any can be translated to the others) and around dataflow programming. +/* Table: an in-memory in-javascript database system based around multisets instead of around tables or documents (though any can be translated to the others) and around dataflow programming. * - * The system provides subtypes (subset, and, or, not, columns, distinct, sum, average, min, max, groupBy, ...). + * The system provides subtypes (Where, and, or, not, columns, distinct, sum, average, min, max, groupBy, ...). * with each type maintaining a live cache of its current state, as computed from whatever it is based on (necessarily, then, the supported operations are limited to whatever can be efficiently computed on a stream; limited to roughly what SQL provides, in fact). * The main concept here is the caching, which is a lot like postgres's materialized view; but unlike postgres's implementation, this does edge-triggered processing: it reacts to changes pushed from a source instead of having to poll a source to get a complete new copy. - * Since JS is pointers-everywhere, for the types which compute new sets (subset, and, or, not) are storage-cheap: each element only actually exists once; the storage requirement for each type is only sizeof(js_ptr)*cache.length. The other types do not have this guarantee (in particular, Columns has to create new objects) + * Since JS is pointers-everywhere, for the types which compute new sets (Where, and, or, not) are storage-cheap: each element only actually exists once; the storage requirement for each type is only sizeof(js_ptr)*cache.length. The other types do not have this guarantee (in particular, Columns has to create new objects) * * * Dependent types listen to their parent's "insert" and "delete" events; when a parent's cache is updated it fires an event, which causes a chain reaction of dependents to check if they should update their cache, and fire their events. @@ -13,11 +13,11 @@ * Garbage Collection: * - it is important that dependees hold strong references to their parents and that parents hold (at most) weak references to their children * e.g. if you say - * var C = new CacheSet([...]); - * var large = C.subset(function(e) { e.width > 9 }); + * var C = new Table([...]); + * var large = C.Where(function(e) { e.width > 9 }); * var v = large.columns(["name", "type"]).distinct(); * then you have must have a chain of strong references - * v (a Distinct) -> Columns -> SubSet -> CacheSet + * v (a Distinct) -> Columns -> Where -> Table * which is good and right because v ultimately depends on all of those * but if you * delete v; (or otherwise lose its reference) @@ -30,9 +30,9 @@ /* Vision for partial server-side querying * if our query ops are compatible with SQL (or at least, if some of them are) * then perhaps we could do something like just quoting the syntax we use to do querying in js and sending that to the server - * or do the AST-object idea where we have a parallel tree of RCacheSet objects (R for "Remote") + * or do the AST-object idea where we have a parallel tree of RTable objects (R for "Remote") * - * the best would be to have a situation where we can switch between RCacheSet and CacheSet objects just by renaming, and mix and match at will, + * the best would be to have a situation where we can switch between RTable and Table objects just by renaming, and mix and match at will, * easy enough that we can do performance testing--perhaps even hotspot performance testing--to figure out which queries should be server side and which should be client side */ @@ -40,7 +40,7 @@ // how do I extend the array type? // do I want to extend the array type? -// CacheSet has an API similar to but only overlapping with--not inheriting--the Array API +// Table has an API similar to but only overlapping with--not inheriting--the Array API // hence, we wrap an array // Notably, *this type is not actually a set* but rather a multiset since it allows multiple copies of a single item // (Use Case: cloning SQL db tables; in practice, all SQL rows should have a unique ID, which automatically, but since SQL allows this not to be true I need to support it not being true) @@ -52,7 +52,7 @@ //var _ = require('../../assets/libs/underscore.js'); //?? //var PourOver = require('../../assets/libs/pourover.js'); -//module.exports = CacheSet +//module.exports = Table /* is essentially defines what we consider to be an equal object * we could also use underscore's _.isEqual() which does recursive value comparison @@ -105,13 +105,13 @@ function is(a,b) { * TODO: implement .length * TODO: implement iterators or element access or something (rather than telling clients to just use ._cache); or, subclass Array and use its built in indexers */ -function CacheSet(seed) { +function Table(seed) { if(seed === null) { seed = new Array(); } this._cache = seed.slice(0); //shallow copy the seed array } -// core CacheSet API -_.extend(CacheSet.prototype, { +// core Table API +_.extend(Table.prototype, { insert: function(e) { this._cache.push(e); //TODO: keep sorted @@ -136,11 +136,11 @@ delete: function(e) { }) //operators -_.extend(CacheSet.prototype, { +_.extend(Table.prototype, { // filtering operators and: function(B) { return new And(this, B); }, or: function(B) { return new Or(this, B); }, -subset: function(pred) { return new SubSet(this, pred); }, +Where: function(pred) { return new Where(this, pred); }, distinct: function() { return new Distinct(this); }, // map operators @@ -153,7 +153,7 @@ scalar: function(field) { return Scalar(this, field); /*CAREFUL: 'new' is WRONG }) -_.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source +_.extend(Table.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source // alternately, roll these ideas into PourOver, though that will be difficult without breaking PourOver, or at least its zen. @@ -162,29 +162,29 @@ _.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events inst * NB: .findIndex is a gecko extension, which we've imported by a polyfill in libs * this */ -CacheSet.prototype.findIndex = function(e) { +Table.prototype.findIndex = function(e) { return this._cache.findIndex(function(g) { return is(g,e); }); } -/* A SubSet is a filtered slice of a CacheSet - * and is itself considered a CacheSet - * Like a SQL View, a SubSet - * but a SubSet is more like a Materialized View (c.f. Postgres; also this hack in MySQL: http://www.fromdual.com/mysql-materialized-views) - // a SubSet S = SubSet(P, pred) by definition is supposed to maintain the invariant that +/* A Where is a filtered slice of a Table + * and is itself considered a Table + * Like a SQL View, a Where + * but a Where is more like a Materialized View (c.f. Postgres; also this hack in MySQL: http://www.fromdual.com/mysql-materialized-views) + // a Where S = Where(P, pred) by definition is supposed to maintain the invariant that // S = { e for e in P if pred(e) } // which equivalently means // \forall `e`: `e in P` and `pred(e)` => `e in S` */ -function SubSet(parent, pred) { +function Where(parent, pred) { var self = this; - //XXX this would be more elegant if it was a subclass of CacheSet, + //XXX this would be more elegant if it was a subclass of Table, // but I don't know how to do js inheritance properly - // also, it's not clear if a SubSet should allow direct .insert() and .delete()s - // TODO: implement .subset(), at least + // also, it's not clear if a Where should allow direct .insert() and .delete()s + // TODO: implement .Where(), at least - CacheSet.call(this, parent._cache.filter(pred)) //call the super constructor + Table.call(this, parent._cache.filter(pred)) //call the super constructor // my goal is to avoid having to redefine the methods like insert() and delete() which should come for free // in particular, I am *not* rewriting the conviencence methods .and(), .or(), etc; I'm putting them in one place, at the top @@ -216,7 +216,7 @@ function SubSet(parent, pred) { } }); } -_.extend(SubSet.prototype, CacheSet.prototype); +_.extend(Where.prototype, Table.prototype); // the prototype needs to be cloned from the parent? // TODO: write a o_cmp which can be used by Array.sort() to order objects; by default, sort() misbehaves on objects @@ -230,7 +230,7 @@ function And(A, B) { // Compute (A and B) as [e for e in A if e in B] //TODO: exploit sorting to make this faster // as written, this is an O(n^2) step - CacheSet.call(this, A._cache.filter(function(a) { + Table.call(this, A._cache.filter(function(a) { return B._cache.indexOf(a) != -1; })); @@ -304,14 +304,14 @@ function And(A, B) { } }); } -_.extend(And.prototype, CacheSet.prototype); +_.extend(And.prototype, Table.prototype); // I really need some sort of SortedArray type which has, like, merge() and filter() ops function Or(A, B) { var self = this; - CacheSet.call(this, A._cache.concat(B._cache)); + Table.call(this, A._cache.concat(B._cache)); self._A = A; self._B = B; @@ -336,10 +336,10 @@ function Or(A, B) { } } -_.extend(Or.prototype, CacheSet.prototype); +_.extend(Or.prototype, Table.prototype); -//TODO: maybe rename "SubSet" "Where" +//TODO: maybe rename "Where" "Where" // and "Columns" "Select" to match SQL function select(o, fields) { @@ -355,19 +355,19 @@ function select(o, fields) { function Map(parent, m) { var self = this; - CacheSet.call(this, parent._cache.map(m)); + Table.call(this, parent._cache.map(m)); parent.on("insert", function(e) { e = m(e); self.insert(e); //warning! 'this' is not 'self' within these event handlers!! }); - parent.on("delete", function(e) { // as in "SubSet", we have an invariant that implies that if we actually see a delete we know necessarily we will perform a delete + parent.on("delete", function(e) { // as in "Where", we have an invariant that implies that if we actually see a delete we know necessarily we will perform a delete e = m(e); self.delete(e); }); } -_.extend(Map.prototype, CacheSet.prototype); +_.extend(Map.prototype, Table.prototype); /* Columns: select only the given fields @@ -389,7 +389,7 @@ function Scalar(parent, field) { }); } -// TODO: implement all the PourOver filters as functions that construct predicates and then return a SubSet +// TODO: implement all the PourOver filters as functions that construct predicates and then return a Where // TODO: implement view disposal var monsters = [{name: "sphinx", mythology: "greek", eyes: 2, sex: "f", hobbies: ["riddles","sitting","being a wonder"]}, @@ -398,7 +398,7 @@ var monsters = [{name: "sphinx", mythology: "greek", eyes: 2, sex: "f", hobbies: {name: "cyclops", mythology: "greek", eyes: 1, sex: "m", hobbies: ["staring","terrorizing","sitting"]}, {name: "fenrir", mythology: "norse", eyes: 2, sex: "m", hobbies: ["growing","god-killing","sitting"]}, {name: "medusa", mythology: "greek", eyes: 2, sex: "f", hobbies: ["coiling","staring"]}]; -var P = new CacheSet(monsters); +var P = new Table(monsters); function setstr(s) { //return Scalar(s, "name")._cache @@ -406,21 +406,18 @@ function setstr(s) { } -function test_cacheset() { +function test_Table() { -console.info("CACHESET TESTS"); +console.info("Table TESTS"); window.P = P; //DEBUG -var norse_monsters = P.subset(function(m) { return m.mythology == "norse" }); -var sitting_monsters = P.subset(function(m) { return _(m.hobbies).contains("sitting") }); +var norse_monsters = P.Where(function(m) { return m.mythology == "norse" }); +var sitting_monsters = P.Where(function(m) { return _(m.hobbies).contains("sitting") }); +window.norse_monsters = norse_monsters; //DEBUG console.log("norse_monsters = ", setstr(norse_monsters)); console.log("sitting_monsters = ", setstr(sitting_monsters)); -qq = P._cache.map(function(e) { - return e["name"]; - }); -console.info(qq) var names = P.scalar("name"); console.log("scalar[names] = ", JSON.stringify(names._cache)); @@ -478,7 +475,7 @@ console.log("norse OR sitting = ", setstr(norse_or_sitting)); console.info(""); } -//test_cacheset(); +test_Table(); /************************* diff --git a/src/frontend/setgraph.html b/src/frontend/setgraph.html index 56f0087..e6f7f5b 100644 --- a/src/frontend/setgraph.html +++ b/src/frontend/setgraph.html @@ -42,34 +42,35 @@ * the address should be a websocket (ws:// or wss://) URL where a replicate.py server is listening. */ // XXX is it possible to write this as a mixin? it would require some way to hook the constructor. -function HDRJCacheSet(address) { - var set = new CacheSet([]); - var feed = new HDRJ(address); +function HDRJTable(address) { - feed.on("insert", function(row) { - set.insert(row); + var self = this; + Table.call(this, []); + this.feed = new HDRJ(address); + + this.feed.on("insert", function(row) { + self.insert(row); //warning: 'this' is not 'self' inside these handlers }) - feed.on("update", function(old_row, new_row) { - set.delete(old_row); - set.insert(new_row); + this.feed.on("update", function(old_row, new_row) { + self.delete(old_row); + self.insert(new_row); }) - feed.on("delete", function(row) { - set.delete(row); + this.feed.on("delete", function(row) { + self.delete(row); }) - - return set; } +_.extend(HDRJTable.prototype, Table.prototype); - var DB = new HDRJCacheSet("ws://" + location.hostname + ":8081"); + var DB = new HDRJTable("ws://" + location.hostname + ":8081"); DB.on("rerender", function() { //console.log("The Films are now ", JSON.stringify(films._cache)) console.log("Now have",this._cache.length, "rows."); }) - var wheat = new Select(DB, ["time", "durumWheatConventional"]) - var organic_wheat = new Select(DB, ["time", "durumWheatGreen"]) + var wheat = DB.select(["time", "durumWheatConventional"]) + var organic_wheat = DB.select(["time", "durumWheatGreen"]) wheat.on("rerender", function() { console.log("Conventional Wheat, by time, across all runs: ", _(_(this._cache).sortBy(function(o) { return o['time']})).pluck("durumWheatConventional")); }); From cd3a67927a183836a7be55c3ccfe0cd251f052bf Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:37:29 -0400 Subject: [PATCH 10/41] Cleaning up the psql directory, in prep for merging into main --- scratch/psql/{ => attic}/FUD.txt | 0 scratch/psql/{ => attic}/WAL/README.md | 0 scratch/psql/{ => attic}/WAL/messages.py | 0 scratch/psql/{ => attic}/WAL/replicant.py | 0 scratch/psql/{ => attic}/WAL/replication_messages.py | 0 scratch/psql/{ => attic/WAL}/util.py | 0 scratch/psql/{ => attic}/experiments/forkit.py | 0 scratch/psql/{ => attic}/experiments/infiniteresult.py | 0 scratch/psql/{ => attic}/experiments/instead_of.pysql | 0 scratch/psql/{ => attic}/experiments/ni.py | 0 scratch/psql/{ => attic}/experiments/nonblockingpipe.py | 0 scratch/psql/{ => attic/experiments}/sel.py | 0 scratch/psql/{ => attic}/experiments/sqlalchemysh | 0 scratch/psql/{ => attic}/experiments/subtransaction.pysql | 0 scratch/psql/{ => attic}/experiments/t.pysql | 0 scratch/psql/{ => attic}/experiments/unixsocket.py | 0 scratch/psql/{ => attic}/experiments/velvet_client.py | 0 scratch/psql/{ => attic}/experiments/velvet_server.py | 0 scratch/psql/{ => attic/experiments}/websockify.md | 0 scratch/psql/{ => attic}/replicate.v1.py | 0 scratch/psql/{ => attic}/watch.psql | 0 scratch/psql/replicate.pysql | 3 ++- scratch/psql/test.replicant.sh | 7 ------- scratch/psql/{ => tests}/films.sql | 0 scratch/psql/{ => tests}/test.psql | 0 25 files changed, 2 insertions(+), 8 deletions(-) rename scratch/psql/{ => attic}/FUD.txt (100%) rename scratch/psql/{ => attic}/WAL/README.md (100%) rename scratch/psql/{ => attic}/WAL/messages.py (100%) rename scratch/psql/{ => attic}/WAL/replicant.py (100%) rename scratch/psql/{ => attic}/WAL/replication_messages.py (100%) rename scratch/psql/{ => attic/WAL}/util.py (100%) rename scratch/psql/{ => attic}/experiments/forkit.py (100%) rename scratch/psql/{ => attic}/experiments/infiniteresult.py (100%) rename scratch/psql/{ => attic}/experiments/instead_of.pysql (100%) rename scratch/psql/{ => attic}/experiments/ni.py (100%) rename scratch/psql/{ => attic}/experiments/nonblockingpipe.py (100%) rename scratch/psql/{ => attic/experiments}/sel.py (100%) rename scratch/psql/{ => attic}/experiments/sqlalchemysh (100%) rename scratch/psql/{ => attic}/experiments/subtransaction.pysql (100%) rename scratch/psql/{ => attic}/experiments/t.pysql (100%) rename scratch/psql/{ => attic}/experiments/unixsocket.py (100%) rename scratch/psql/{ => attic}/experiments/velvet_client.py (100%) rename scratch/psql/{ => attic}/experiments/velvet_server.py (100%) rename scratch/psql/{ => attic/experiments}/websockify.md (100%) rename scratch/psql/{ => attic}/replicate.v1.py (100%) rename scratch/psql/{ => attic}/watch.psql (100%) delete mode 100755 scratch/psql/test.replicant.sh rename scratch/psql/{ => tests}/films.sql (100%) rename scratch/psql/{ => tests}/test.psql (100%) diff --git a/scratch/psql/FUD.txt b/scratch/psql/attic/FUD.txt similarity index 100% rename from scratch/psql/FUD.txt rename to scratch/psql/attic/FUD.txt diff --git a/scratch/psql/WAL/README.md b/scratch/psql/attic/WAL/README.md similarity index 100% rename from scratch/psql/WAL/README.md rename to scratch/psql/attic/WAL/README.md diff --git a/scratch/psql/WAL/messages.py b/scratch/psql/attic/WAL/messages.py similarity index 100% rename from scratch/psql/WAL/messages.py rename to scratch/psql/attic/WAL/messages.py diff --git a/scratch/psql/WAL/replicant.py b/scratch/psql/attic/WAL/replicant.py similarity index 100% rename from scratch/psql/WAL/replicant.py rename to scratch/psql/attic/WAL/replicant.py diff --git a/scratch/psql/WAL/replication_messages.py b/scratch/psql/attic/WAL/replication_messages.py similarity index 100% rename from scratch/psql/WAL/replication_messages.py rename to scratch/psql/attic/WAL/replication_messages.py diff --git a/scratch/psql/util.py b/scratch/psql/attic/WAL/util.py similarity index 100% rename from scratch/psql/util.py rename to scratch/psql/attic/WAL/util.py diff --git a/scratch/psql/experiments/forkit.py b/scratch/psql/attic/experiments/forkit.py similarity index 100% rename from scratch/psql/experiments/forkit.py rename to scratch/psql/attic/experiments/forkit.py diff --git a/scratch/psql/experiments/infiniteresult.py b/scratch/psql/attic/experiments/infiniteresult.py similarity index 100% rename from scratch/psql/experiments/infiniteresult.py rename to scratch/psql/attic/experiments/infiniteresult.py diff --git a/scratch/psql/experiments/instead_of.pysql b/scratch/psql/attic/experiments/instead_of.pysql similarity index 100% rename from scratch/psql/experiments/instead_of.pysql rename to scratch/psql/attic/experiments/instead_of.pysql diff --git a/scratch/psql/experiments/ni.py b/scratch/psql/attic/experiments/ni.py similarity index 100% rename from scratch/psql/experiments/ni.py rename to scratch/psql/attic/experiments/ni.py diff --git a/scratch/psql/experiments/nonblockingpipe.py b/scratch/psql/attic/experiments/nonblockingpipe.py similarity index 100% rename from scratch/psql/experiments/nonblockingpipe.py rename to scratch/psql/attic/experiments/nonblockingpipe.py diff --git a/scratch/psql/sel.py b/scratch/psql/attic/experiments/sel.py similarity index 100% rename from scratch/psql/sel.py rename to scratch/psql/attic/experiments/sel.py diff --git a/scratch/psql/experiments/sqlalchemysh b/scratch/psql/attic/experiments/sqlalchemysh similarity index 100% rename from scratch/psql/experiments/sqlalchemysh rename to scratch/psql/attic/experiments/sqlalchemysh diff --git a/scratch/psql/experiments/subtransaction.pysql b/scratch/psql/attic/experiments/subtransaction.pysql similarity index 100% rename from scratch/psql/experiments/subtransaction.pysql rename to scratch/psql/attic/experiments/subtransaction.pysql diff --git a/scratch/psql/experiments/t.pysql b/scratch/psql/attic/experiments/t.pysql similarity index 100% rename from scratch/psql/experiments/t.pysql rename to scratch/psql/attic/experiments/t.pysql diff --git a/scratch/psql/experiments/unixsocket.py b/scratch/psql/attic/experiments/unixsocket.py similarity index 100% rename from scratch/psql/experiments/unixsocket.py rename to scratch/psql/attic/experiments/unixsocket.py diff --git a/scratch/psql/experiments/velvet_client.py b/scratch/psql/attic/experiments/velvet_client.py similarity index 100% rename from scratch/psql/experiments/velvet_client.py rename to scratch/psql/attic/experiments/velvet_client.py diff --git a/scratch/psql/experiments/velvet_server.py b/scratch/psql/attic/experiments/velvet_server.py similarity index 100% rename from scratch/psql/experiments/velvet_server.py rename to scratch/psql/attic/experiments/velvet_server.py diff --git a/scratch/psql/websockify.md b/scratch/psql/attic/experiments/websockify.md similarity index 100% rename from scratch/psql/websockify.md rename to scratch/psql/attic/experiments/websockify.md diff --git a/scratch/psql/replicate.v1.py b/scratch/psql/attic/replicate.v1.py similarity index 100% rename from scratch/psql/replicate.v1.py rename to scratch/psql/attic/replicate.v1.py diff --git a/scratch/psql/watch.psql b/scratch/psql/attic/watch.psql similarity index 100% rename from scratch/psql/watch.psql rename to scratch/psql/attic/watch.psql diff --git a/scratch/psql/replicate.pysql b/scratch/psql/replicate.pysql index 4c603f8..1384a61 100644 --- a/scratch/psql/replicate.pysql +++ b/scratch/psql/replicate.pysql @@ -6,6 +6,7 @@ -- TODO: -- [ ] Install these hooks into a spare tablespace -- [ ] Make sure these hooks can be deployed to production servers +-- [ ] Do load tests to make sure these hooks don't lag out the server; actually formatting and compacting the IP packets should perhaps be pushed out to an external process CREATE OR REPLACE LANGUAGE plpython2u; @@ -187,4 +188,4 @@ $$ # TODO: # this function should also uninstall triggers if there are no listeners for _table left -$$ LANGUAGE plpython2u; \ No newline at end of file +$$ LANGUAGE plpython2u; diff --git a/scratch/psql/test.replicant.sh b/scratch/psql/test.replicant.sh deleted file mode 100755 index 77499cf..0000000 --- a/scratch/psql/test.replicant.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env sh -# demonstrate how to run the replicant - -echo '{"table": "films"}' | ./replicant.py 2>/dev/null - -# in future, this will probably be: -#./replicant.py "films" 2>/dev/null diff --git a/scratch/psql/films.sql b/scratch/psql/tests/films.sql similarity index 100% rename from scratch/psql/films.sql rename to scratch/psql/tests/films.sql diff --git a/scratch/psql/test.psql b/scratch/psql/tests/test.psql similarity index 100% rename from scratch/psql/test.psql rename to scratch/psql/tests/test.psql From df4ea19620b9837b83258f8bab25461c33ee9b3e Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:39:17 -0400 Subject: [PATCH 11/41] Move psql into mainline. Now called src/backend/db --- scratch/psql/attic/README.md | 1 + {scratch/psql => src/backend/db}/README.md | 0 {scratch/psql => src/backend/db}/client.sh | 0 {scratch/psql => src/backend/db}/dump.sh | 0 {scratch/psql => src/backend/db}/init.sh | 0 {scratch/psql => src/backend/db}/replicate.py | 0 {scratch/psql => src/backend/db}/replicate.pysql | 0 {scratch/psql => src/backend/db}/replicate.sh | 0 {scratch/psql => src/backend/db}/replicate_server.sh | 0 {scratch/psql => src/backend/db}/server.sh | 0 {scratch/psql => src/backend/db}/tests/films.sql | 0 {scratch/psql => src/backend/db}/tests/test.psql | 0 12 files changed, 1 insertion(+) create mode 100644 scratch/psql/attic/README.md rename {scratch/psql => src/backend/db}/README.md (100%) rename {scratch/psql => src/backend/db}/client.sh (100%) rename {scratch/psql => src/backend/db}/dump.sh (100%) rename {scratch/psql => src/backend/db}/init.sh (100%) rename {scratch/psql => src/backend/db}/replicate.py (100%) rename {scratch/psql => src/backend/db}/replicate.pysql (100%) rename {scratch/psql => src/backend/db}/replicate.sh (100%) rename {scratch/psql => src/backend/db}/replicate_server.sh (100%) rename {scratch/psql => src/backend/db}/server.sh (100%) rename {scratch/psql => src/backend/db}/tests/films.sql (100%) rename {scratch/psql => src/backend/db}/tests/test.psql (100%) diff --git a/scratch/psql/attic/README.md b/scratch/psql/attic/README.md new file mode 100644 index 0000000..4903430 --- /dev/null +++ b/scratch/psql/attic/README.md @@ -0,0 +1 @@ +This is the cruft code generated in writing postgres realtime replication, which now lives in src/backend/db diff --git a/scratch/psql/README.md b/src/backend/db/README.md similarity index 100% rename from scratch/psql/README.md rename to src/backend/db/README.md diff --git a/scratch/psql/client.sh b/src/backend/db/client.sh similarity index 100% rename from scratch/psql/client.sh rename to src/backend/db/client.sh diff --git a/scratch/psql/dump.sh b/src/backend/db/dump.sh similarity index 100% rename from scratch/psql/dump.sh rename to src/backend/db/dump.sh diff --git a/scratch/psql/init.sh b/src/backend/db/init.sh similarity index 100% rename from scratch/psql/init.sh rename to src/backend/db/init.sh diff --git a/scratch/psql/replicate.py b/src/backend/db/replicate.py similarity index 100% rename from scratch/psql/replicate.py rename to src/backend/db/replicate.py diff --git a/scratch/psql/replicate.pysql b/src/backend/db/replicate.pysql similarity index 100% rename from scratch/psql/replicate.pysql rename to src/backend/db/replicate.pysql diff --git a/scratch/psql/replicate.sh b/src/backend/db/replicate.sh similarity index 100% rename from scratch/psql/replicate.sh rename to src/backend/db/replicate.sh diff --git a/scratch/psql/replicate_server.sh b/src/backend/db/replicate_server.sh similarity index 100% rename from scratch/psql/replicate_server.sh rename to src/backend/db/replicate_server.sh diff --git a/scratch/psql/server.sh b/src/backend/db/server.sh similarity index 100% rename from scratch/psql/server.sh rename to src/backend/db/server.sh diff --git a/scratch/psql/tests/films.sql b/src/backend/db/tests/films.sql similarity index 100% rename from scratch/psql/tests/films.sql rename to src/backend/db/tests/films.sql diff --git a/scratch/psql/tests/test.psql b/src/backend/db/tests/test.psql similarity index 100% rename from scratch/psql/tests/test.psql rename to src/backend/db/tests/test.psql From ac77aa45ea7b2cc26dc7a4e7ba96ed7840b1e4ae Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:49:27 -0400 Subject: [PATCH 12/41] Spec the target DB for replication on the command line instead of hardcoding. This now matches the stated API in commit ff01d36543b6f8b34235c5eb6b308742e6481a2e. --- src/backend/db/replicate.py | 16 ++++++++++------ src/backend/db/replicate_server.sh | 5 +++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/backend/db/replicate.py b/src/backend/db/replicate.py index 674ba3f..7db5997 100755 --- a/src/backend/db/replicate.py +++ b/src/backend/db/replicate.py @@ -55,10 +55,6 @@ logging.getLogger().setLevel(logging.DEBUG) -DB_SOCKET = "data" #path to folder containing the postgres socket -DB_SOCKET = os.path.abspath(DB_SOCKET) #postgres can't handle relative paths -DB_CONN_STRING = "postgresql:///postgres?host=%s" % (DB_SOCKET,) -E = sqlalchemy.create_engine(DB_CONN_STRING) #TODO: deglobalize #import IPython; IPython.embed() @@ -215,7 +211,15 @@ def replicatethread(table, ctl_sock): if __name__ == '__main__': import sys - table = sys.argv[1] + postgres, table = sys.argv[1:] + + assert postgres.startswith("postgresql://"), "Must be a valid sqlalchemy-to-postgres connection string" #XXX there are connection strings which spec the driver to use which are valid too; this assertion is too strong + #DB_SOCKET = "data" #path to folder containing the postgres socket + #DB_SOCKET = os.path.abspath(DB_SOCKET) #postgres can't handle relative paths + #DB_CONN_STRING = "postgresql:///postgres?host=%s" % (DB_SOCKET,) + global E + E = sqlalchemy.create_engine(postgres) #TODO: deglobalize + ctl, ctl_slave = socket.socketpair() #when this socket closes all spools (all two of them, but it could be generalized) should shut down immediately @@ -244,4 +248,4 @@ def replicatethread(table, ctl_sock): finally: ctl.close() - RT.join() #give the replication a chanc eto clean up after itself \ No newline at end of file + RT.join() #give the replication a chanc eto clean up after itself diff --git a/src/backend/db/replicate_server.sh b/src/backend/db/replicate_server.sh index e0cf71f..46d22f4 100755 --- a/src/backend/db/replicate_server.sh +++ b/src/backend/db/replicate_server.sh @@ -12,9 +12,10 @@ # the multiple-client effect is achieved by socat TCP-LISTEN,...,fork [nc calls this -k] PORT="$1" -TABLE="$2" +DB="$2" +TABLE="$3" -SERVER="./replicate.sh ${TABLE}" +SERVER="./replicate.sh \"${DB}\" \"${TABLE}\"" # rather than use up a TCP port for the websockify <--> socat connection # use a unix domain socket instead From 9ad6e05c737f93275b57db6f5653e3b71b6afa2f Mon Sep 17 00:00:00 2001 From: kousu Date: Thu, 4 Sep 2014 02:51:51 -0400 Subject: [PATCH 13/41] Complete the CacheSet->Table naming transition --- src/frontend/setgraph.html | 6 +++--- src/frontend/{cacheset.js => tables.js} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename src/frontend/{cacheset.js => tables.js} (100%) diff --git a/src/frontend/setgraph.html b/src/frontend/setgraph.html index e6f7f5b..5e47e2f 100644 --- a/src/frontend/setgraph.html +++ b/src/frontend/setgraph.html @@ -34,7 +34,7 @@ - + + + +
+
+ +
+[Docs] [txt|pdf] [draft-ietf-aft-so...] [Diff1] [Diff2] [Errata]
+
+ PROPOSED STANDARD
+ Errata Exist
+
+Network Working Group                                           M. Leech
+Request for Comments: 1928                    Bell-Northern Research Ltd
+Category: Standards Track                                       M. Ganis
+                                         International Business Machines
+                                                                  Y. Lee
+                                                  NEC Systems Laboratory
+                                                                R. Kuris
+                                                       Unify Corporation
+                                                               D. Koblas
+                                                  Independent Consultant
+                                                                L. Jones
+                                                 Hewlett-Packard Company
+                                                              March 1996
+
+
+                        SOCKS Protocol Version 5
+
+Status of this Memo
+
+   This document specifies an Internet standards track protocol for the
+   Internet community, and requests discussion and suggestions for
+   improvements.  Please refer to the current edition of the "Internet
+   Official Protocol Standards" (STD 1) for the standardization state
+   and status of this protocol.  Distribution of this memo is unlimited.
+
+Acknowledgments
+
+   This memo describes a protocol that is an evolution of the previous
+   version of the protocol, version 4 [1]. This new protocol stems from
+   active discussions and prototype implementations.  The key
+   contributors are: Marcus Leech: Bell-Northern Research, David Koblas:
+   Independent Consultant, Ying-Da Lee: NEC Systems Laboratory, LaMont
+   Jones: Hewlett-Packard Company, Ron Kuris: Unify Corporation, Matt
+   Ganis: International Business Machines.
+
+1.  Introduction
+
+   The use of network firewalls, systems that effectively isolate an
+   organizations internal network structure from an exterior network,
+   such as the INTERNET is becoming increasingly popular.  These
+   firewall systems typically act as application-layer gateways between
+   networks, usually offering controlled TELNET, FTP, and SMTP access.
+   With the emergence of more sophisticated application layer protocols
+   designed to facilitate global information discovery, there exists a
+   need to provide a general framework for these protocols to
+   transparently and securely traverse a firewall.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 1]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   There exists, also, a need for strong authentication of such
+   traversal in as fine-grained a manner as is practical. This
+   requirement stems from the realization that client-server
+   relationships emerge between the networks of various organizations,
+   and that such relationships need to be controlled and often strongly
+   authenticated.
+
+   The protocol described here is designed to provide a framework for
+   client-server applications in both the TCP and UDP domains to
+   conveniently and securely use the services of a network firewall.
+   The protocol is conceptually a "shim-layer" between the application
+   layer and the transport layer, and as such does not provide network-
+   layer gateway services, such as forwarding of ICMP messages.
+
+2.  Existing practice
+
+   There currently exists a protocol, SOCKS Version 4, that provides for
+   unsecured firewall traversal for TCP-based client-server
+   applications, including TELNET, FTP and the popular information-
+   discovery protocols such as HTTP, WAIS and GOPHER.
+
+   This new protocol extends the SOCKS Version 4 model to include UDP,
+   and extends the framework to include provisions for generalized
+   strong authentication schemes, and extends the addressing scheme to
+   encompass domain-name and V6 IP addresses.
+
+   The implementation of the SOCKS protocol typically involves the
+   recompilation or relinking of TCP-based client applications to use
+   the appropriate encapsulation routines in the SOCKS library.
+
+Note:
+
+   Unless otherwise noted, the decimal numbers appearing in packet-
+   format diagrams represent the length of the corresponding field, in
+   octets.  Where a given octet must take on a specific value, the
+   syntax X'hh' is used to denote the value of the single octet in that
+   field. When the word 'Variable' is used, it indicates that the
+   corresponding field has a variable length defined either by an
+   associated (one or two octet) length field, or by a data type field.
+
+3.  Procedure for TCP-based clients
+
+   When a TCP-based client wishes to establish a connection to an object
+   that is reachable only via a firewall (such determination is left up
+   to the implementation), it must open a TCP connection to the
+   appropriate SOCKS port on the SOCKS server system.  The SOCKS service
+   is conventionally located on TCP port 1080.  If the connection
+   request succeeds, the client enters a negotiation for the
+
+
+
+Leech, et al                Standards Track                     [Page 2]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   authentication method to be used, authenticates with the chosen
+   method, then sends a relay request.  The SOCKS server evaluates the
+   request, and either establishes the appropriate connection or denies
+   it.
+
+   Unless otherwise noted, the decimal numbers appearing in packet-
+   format diagrams represent the length of the corresponding field, in
+   octets.  Where a given octet must take on a specific value, the
+   syntax X'hh' is used to denote the value of the single octet in that
+   field. When the word 'Variable' is used, it indicates that the
+   corresponding field has a variable length defined either by an
+   associated (one or two octet) length field, or by a data type field.
+
+   The client connects to the server, and sends a version
+   identifier/method selection message:
+
+                   +----+----------+----------+
+                   |VER | NMETHODS | METHODS  |
+                   +----+----------+----------+
+                   | 1  |    1     | 1 to 255 |
+                   +----+----------+----------+
+
+   The VER field is set to X'05' for this version of the protocol.  The
+   NMETHODS field contains the number of method identifier octets that
+   appear in the METHODS field.
+
+   The server selects from one of the methods given in METHODS, and
+   sends a METHOD selection message:
+
+                         +----+--------+
+                         |VER | METHOD |
+                         +----+--------+
+                         | 1  |   1    |
+                         +----+--------+
+
+   If the selected METHOD is X'FF', none of the methods listed by the
+   client are acceptable, and the client MUST close the connection.
+
+   The values currently defined for METHOD are:
+
+          o  X'00' NO AUTHENTICATION REQUIRED
+          o  X'01' GSSAPI
+          o  X'02' USERNAME/PASSWORD
+          o  X'03' to X'7F' IANA ASSIGNED
+          o  X'80' to X'FE' RESERVED FOR PRIVATE METHODS
+          o  X'FF' NO ACCEPTABLE METHODS
+
+   The client and server then enter a method-specific sub-negotiation.
+
+
+
+Leech, et al                Standards Track                     [Page 3]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   Descriptions of the method-dependent sub-negotiations appear in
+   separate memos.
+
+   Developers of new METHOD support for this protocol should contact
+   IANA for a METHOD number.  The ASSIGNED NUMBERS document should be
+   referred to for a current list of METHOD numbers and their
+   corresponding protocols.
+
+   Compliant implementations MUST support GSSAPI and SHOULD support
+   USERNAME/PASSWORD authentication methods.
+
+4.  Requests
+
+   Once the method-dependent subnegotiation has completed, the client
+   sends the request details.  If the negotiated method includes
+   encapsulation for purposes of integrity checking and/or
+   confidentiality, these requests MUST be encapsulated in the method-
+   dependent encapsulation.
+
+   The SOCKS request is formed as follows:
+
+        +----+-----+-------+------+----------+----------+
+        |VER | CMD |  RSV  | ATYP | DST.ADDR | DST.PORT |
+        +----+-----+-------+------+----------+----------+
+        | 1  |  1  | X'00' |  1   | Variable |    2     |
+        +----+-----+-------+------+----------+----------+
+
+     Where:
+
+          o  VER    protocol version: X'05'
+          o  CMD
+             o  CONNECT X'01'
+             o  BIND X'02'
+             o  UDP ASSOCIATE X'03'
+          o  RSV    RESERVED
+          o  ATYP   address type of following address
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  DST.ADDR       desired destination address
+          o  DST.PORT desired destination port in network octet
+             order
+
+   The SOCKS server will typically evaluate the request based on source
+   and destination addresses, and return one or more reply messages, as
+   appropriate for the request type.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 4]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+5.  Addressing
+
+   In an address field (DST.ADDR, BND.ADDR), the ATYP field specifies
+   the type of address contained within the field:
+
+          o  X'01'
+
+   the address is a version-4 IP address, with a length of 4 octets
+
+          o  X'03'
+
+   the address field contains a fully-qualified domain name.  The first
+   octet of the address field contains the number of octets of name that
+   follow, there is no terminating NUL octet.
+
+          o  X'04'
+
+   the address is a version-6 IP address, with a length of 16 octets.
+
+6.  Replies
+
+   The SOCKS request information is sent by the client as soon as it has
+   established a connection to the SOCKS server, and completed the
+   authentication negotiations.  The server evaluates the request, and
+   returns a reply formed as follows:
+
+        +----+-----+-------+------+----------+----------+
+        |VER | REP |  RSV  | ATYP | BND.ADDR | BND.PORT |
+        +----+-----+-------+------+----------+----------+
+        | 1  |  1  | X'00' |  1   | Variable |    2     |
+        +----+-----+-------+------+----------+----------+
+
+     Where:
+
+          o  VER    protocol version: X'05'
+          o  REP    Reply field:
+             o  X'00' succeeded
+             o  X'01' general SOCKS server failure
+             o  X'02' connection not allowed by ruleset
+             o  X'03' Network unreachable
+             o  X'04' Host unreachable
+             o  X'05' Connection refused
+             o  X'06' TTL expired
+             o  X'07' Command not supported
+             o  X'08' Address type not supported
+             o  X'09' to X'FF' unassigned
+          o  RSV    RESERVED
+          o  ATYP   address type of following address
+
+
+
+Leech, et al                Standards Track                     [Page 5]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  BND.ADDR       server bound address
+          o  BND.PORT       server bound port in network octet order
+
+   Fields marked RESERVED (RSV) must be set to X'00'.
+
+   If the chosen method includes encapsulation for purposes of
+   authentication, integrity and/or confidentiality, the replies are
+   encapsulated in the method-dependent encapsulation.
+
+CONNECT
+
+   In the reply to a CONNECT, BND.PORT contains the port number that the
+   server assigned to connect to the target host, while BND.ADDR
+   contains the associated IP address.  The supplied BND.ADDR is often
+   different from the IP address that the client uses to reach the SOCKS
+   server, since such servers are often multi-homed.  It is expected
+   that the SOCKS server will use DST.ADDR and DST.PORT, and the
+   client-side source address and port in evaluating the CONNECT
+   request.
+
+BIND
+
+   The BIND request is used in protocols which require the client to
+   accept connections from the server.  FTP is a well-known example,
+   which uses the primary client-to-server connection for commands and
+   status reports, but may use a server-to-client connection for
+   transferring data on demand (e.g. LS, GET, PUT).
+
+   It is expected that the client side of an application protocol will
+   use the BIND request only to establish secondary connections after a
+   primary connection is established using CONNECT.  In is expected that
+   a SOCKS server will use DST.ADDR and DST.PORT in evaluating the BIND
+   request.
+
+   Two replies are sent from the SOCKS server to the client during a
+   BIND operation.  The first is sent after the server creates and binds
+   a new socket.  The BND.PORT field contains the port number that the
+   SOCKS server assigned to listen for an incoming connection.  The
+   BND.ADDR field contains the associated IP address.  The client will
+   typically use these pieces of information to notify (via the primary
+   or control connection) the application server of the rendezvous
+   address.  The second reply occurs only after the anticipated incoming
+   connection succeeds or fails.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 6]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   In the second reply, the BND.PORT and BND.ADDR fields contain the
+   address and port number of the connecting host.
+
+UDP ASSOCIATE
+
+   The UDP ASSOCIATE request is used to establish an association within
+   the UDP relay process to handle UDP datagrams.  The DST.ADDR and
+   DST.PORT fields contain the address and port that the client expects
+   to use to send UDP datagrams on for the association.  The server MAY
+   use this information to limit access to the association.  If the
+   client is not in possesion of the information at the time of the UDP
+   ASSOCIATE, the client MUST use a port number and address of all
+   zeros.
+
+   A UDP association terminates when the TCP connection that the UDP
+   ASSOCIATE request arrived on terminates.
+
+   In the reply to a UDP ASSOCIATE request, the BND.PORT and BND.ADDR
+   fields indicate the port number/address where the client MUST send
+   UDP request messages to be relayed.
+
+Reply Processing
+
+   When a reply (REP value other than X'00') indicates a failure, the
+   SOCKS server MUST terminate the TCP connection shortly after sending
+   the reply.  This must be no more than 10 seconds after detecting the
+   condition that caused a failure.
+
+   If the reply code (REP value of X'00') indicates a success, and the
+   request was either a BIND or a CONNECT, the client may now start
+   passing data.  If the selected authentication method supports
+   encapsulation for the purposes of integrity, authentication and/or
+   confidentiality, the data are encapsulated using the method-dependent
+   encapsulation.  Similarly, when data arrives at the SOCKS server for
+   the client, the server MUST encapsulate the data as appropriate for
+   the authentication method in use.
+
+7.  Procedure for UDP-based clients
+
+   A UDP-based client MUST send its datagrams to the UDP relay server at
+   the UDP port indicated by BND.PORT in the reply to the UDP ASSOCIATE
+   request.  If the selected authentication method provides
+   encapsulation for the purposes of authenticity, integrity, and/or
+   confidentiality, the datagram MUST be encapsulated using the
+   appropriate encapsulation.  Each UDP datagram carries a UDP request
+   header with it:
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 7]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+      +----+------+------+----------+----------+----------+
+      |RSV | FRAG | ATYP | DST.ADDR | DST.PORT |   DATA   |
+      +----+------+------+----------+----------+----------+
+      | 2  |  1   |  1   | Variable |    2     | Variable |
+      +----+------+------+----------+----------+----------+
+
+     The fields in the UDP request header are:
+
+          o  RSV  Reserved X'0000'
+          o  FRAG    Current fragment number
+          o  ATYP    address type of following addresses:
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  DST.ADDR       desired destination address
+          o  DST.PORT       desired destination port
+          o  DATA     user data
+
+   When a UDP relay server decides to relay a UDP datagram, it does so
+   silently, without any notification to the requesting client.
+   Similarly, it will drop datagrams it cannot or will not relay.  When
+   a UDP relay server receives a reply datagram from a remote host, it
+   MUST encapsulate that datagram using the above UDP request header,
+   and any authentication-method-dependent encapsulation.
+
+   The UDP relay server MUST acquire from the SOCKS server the expected
+   IP address of the client that will send datagrams to the BND.PORT
+   given in the reply to UDP ASSOCIATE.  It MUST drop any datagrams
+   arriving from any source IP address other than the one recorded for
+   the particular association.
+
+   The FRAG field indicates whether or not this datagram is one of a
+   number of fragments.  If implemented, the high-order bit indicates
+   end-of-fragment sequence, while a value of X'00' indicates that this
+   datagram is standalone.  Values between 1 and 127 indicate the
+   fragment position within a fragment sequence.  Each receiver will
+   have a REASSEMBLY QUEUE and a REASSEMBLY TIMER associated with these
+   fragments.  The reassembly queue must be reinitialized and the
+   associated fragments abandoned whenever the REASSEMBLY TIMER expires,
+   or a new datagram arrives carrying a FRAG field whose value is less
+   than the highest FRAG value processed for this fragment sequence.
+   The reassembly timer MUST be no less than 5 seconds.  It is
+   recommended that fragmentation be avoided by applications wherever
+   possible.
+
+   Implementation of fragmentation is optional; an implementation that
+   does not support fragmentation MUST drop any datagram whose FRAG
+   field is other than X'00'.
+
+
+
+Leech, et al                Standards Track                     [Page 8]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   The programming interface for a SOCKS-aware UDP MUST report an
+   available buffer space for UDP datagrams that is smaller than the
+   actual space provided by the operating system:
+
+          o  if ATYP is X'01' - 10+method_dependent octets smaller
+          o  if ATYP is X'03' - 262+method_dependent octets smaller
+          o  if ATYP is X'04' - 20+method_dependent octets smaller
+
+8.  Security Considerations
+
+   This document describes a protocol for the application-layer
+   traversal of IP network firewalls.  The security of such traversal is
+   highly dependent on the particular authentication and encapsulation
+   methods provided in a particular implementation, and selected during
+   negotiation between SOCKS client and SOCKS server.
+
+   Careful consideration should be given by the administrator to the
+   selection of authentication methods.
+
+9.  References
+
+   [1] Koblas, D., "SOCKS", Proceedings: 1992 Usenix Security Symposium.
+
+Author's Address
+
+       Marcus Leech
+       Bell-Northern Research Ltd
+       P.O. Box 3511, Stn. C,
+       Ottawa, ON
+       CANADA K1Y 4H7
+
+       Phone: (613) 763-9145
+       EMail: mleech@bnr.ca
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 9]
+
+

+Html markup produced by rfcmarkup 1.108, available from +http://tools.ietf.org/tools/rfcmarkup/ + + diff --git a/scratch/remoting/socks5.ws/ayepromise.js b/scratch/remoting/socks5.ws/ayepromise.js new file mode 100644 index 0000000..e245b6b --- /dev/null +++ b/scratch/remoting/socks5.ws/ayepromise.js @@ -0,0 +1,180 @@ +// UMD header +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory(); + } else { + root.ayepromise = factory(); + } +}(this, function () { + 'use strict'; + + var ayepromise = {}; + + /* Wrap an arbitrary number of functions and allow only one of them to be + executed and only once */ + var once = function () { + var wasCalled = false; + + return function wrapper(wrappedFunction) { + return function () { + if (wasCalled) { + return; + } + wasCalled = true; + wrappedFunction.apply(null, arguments); + }; + }; + }; + + var getThenableIfExists = function (obj) { + // Make sure we only access the accessor once + var then = obj && obj.then; + + if (obj !== null && + typeof obj === "object" && + typeof then === "function") { + + return then.bind(obj); + } + }; + + var doChainCall = function (defer, func, value) { + setTimeout(function () { + var returnValue; + try { + returnValue = func(value); + } catch (e) { + defer.reject(e); + return; + } + + if (returnValue === defer.promise) { + defer.reject(new TypeError('Cannot resolve promise with itself')); + } else { + defer.resolve(returnValue); + } + }, 1); + }; + + var doFulfillCall = function (defer, onFulfilled, value) { + if (onFulfilled && onFulfilled.call) { + doChainCall(defer, onFulfilled, value); + } else { + defer.resolve(value); + } + }; + + var doRejectCall = function (defer, onRejected, value) { + if (onRejected && onRejected.call) { + doChainCall(defer, onRejected, value); + } else { + defer.reject(value); + } + }; + + var aCallChainLink = function (onFulfilled, onRejected) { + var defer = ayepromise.defer(); + return { + promise: defer.promise, + callFulfilled: function (value) { + doFulfillCall(defer, onFulfilled, value); + }, + callRejected: function (value) { + doRejectCall(defer, onRejected, value); + } + }; + }; + + // States + var PENDING = 0, + FULFILLED = 1, + REJECTED = 2; + + ayepromise.defer = function () { + var state = PENDING, + outcome, + callbacks = []; + + var doFulfill = function (value) { + state = FULFILLED; + outcome = value; + + callbacks.forEach(function (link) { + link.callFulfilled(outcome); + }); + }; + + var doReject = function (error) { + state = REJECTED; + outcome = error; + + callbacks.forEach(function (link) { + link.callRejected(outcome); + }); + }; + + var executeResultHandlerDirectlyIfStateNotPendingAnymore = function (link) { + if (state === FULFILLED) { + link.callFulfilled(outcome); + } else if (state === REJECTED) { + link.callRejected(outcome); + } + }; + + var registerResultHandler = function (onFulfilled, onRejected) { + var link = aCallChainLink(onFulfilled, onRejected); + + callbacks.push(link); + + executeResultHandlerDirectlyIfStateNotPendingAnymore(link); + + return link.promise; + }; + + var safelyResolveThenable = function (thenable) { + // Either fulfill, reject or reject with error + var onceWrapper = once(); + try { + thenable( + onceWrapper(transparentlyResolveThenablesAndFulfill), + onceWrapper(doReject) + ); + } catch (e) { + onceWrapper(doReject)(e); + } + }; + + var transparentlyResolveThenablesAndFulfill = function (value) { + var thenable; + + try { + thenable = getThenableIfExists(value); + } catch (e) { + doReject(e); + return; + } + + if (thenable) { + safelyResolveThenable(thenable); + } else { + doFulfill(value); + } + }; + + var onceWrapper = once(); + return { + resolve: onceWrapper(transparentlyResolveThenablesAndFulfill), + reject: onceWrapper(doReject), + promise: { + then: registerResultHandler, + fail: function (onRejected) { + return registerResultHandler(null, onRejected); + } + } + }; + }; + + return ayepromise; +})); diff --git a/scratch/remoting/socks5.ws/socks5.js b/scratch/remoting/socks5.ws/socks5.js new file mode 100644 index 0000000..bb3d342 --- /dev/null +++ b/scratch/remoting/socks5.ws/socks5.js @@ -0,0 +1,404 @@ +/* socks5.js + * + * Partially implements the SOCKS5 protocol over WebSockets + * The missing parts are around the types of authentication: + * does NOT support GSSAPI, [and has no means for plugging in new ]. + * + * Depends on WebSocketStream (which depends on ayepromise). + */ + +/* TODO + * + * + * [ ] We *should* be sending binary frames (i.e. have websocket frame opcode 0x2 i.e. use Blob()s) + * The SOCKS headers are binary, except for the DOMAINNAME address type, which is presumably ASCII. But after the SOCKS headers, we should get out of the way and use whatever form the user or remote end hands us (getting this right will mostly be an exercise in getting unit tests right) + * [ ] We should be able to parse and unparse IP addresses + since SOCKS transfers them in plain bytes, and the + header we send depends on the format we send the address in. + * [ ] Instead of using Strings everywhere, use integer enum codes and have a single function that does conversion between ints and (network ordered!) byte strings + * [ ] factor the common parts of the protocol in some way that allows implementing all three kinds of APIs is feasible + * [ ] unit tests!! + */ + +// UMD header +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory(); + } else { + root.SOCKS5 = factory(); + } +}(this, function () { + 'use strict'; + +var WebSocketStream = require("./websocketstream.js") + + +/* Utility Routines */ + +function split_addr(addr) { + // I would rather use the more general 'rsplit()' + // but js doesn't have that and writing it myself is fraught: + //http://stackoverflow.com/questions/958908/how-do-you-reverse-a-string-in-place-in-javascript/16776621#16776621 + + var split = addr.lastIndexOf(":") + var host = addr.slice(0, split) + var port = addr.slice(split+1) + return [host, port] +} + +function join_addr(host, port) { + return host + ":" + port; +} + + +function build_int(n) { + if(!(0 <= n && n<=0xFF)) throw "Integer out of range. SOCKS integers must fit in one byte" + return String.fromCharCode(n); +} + + +function SOCKS5(proxy, target, user, pass) { + /* implements the SOCKS5 client protocol + * reference: https://tools.ietf.org/html/rfc1928 + * + * This class only implements TCP CONNECT SOCKS; SOCKS also allows UDP and even BIND modes, but those types require distinctly different APIs (UDP needs .send() and .onmessage; BIND only allows one remote TCP connection (i.e. it's not a full listen()+accept() implementation) but presumably there should be an intermediate state for "connected to the proxy but no one is connected to us" + + */ + + //A) initializations + + var self = this; + self.target = target + self.remote = { host: null, port: null } //the address of the remote end of the tunnel + + self._ws = new WebSocketStream(proxy) + + self._ws.onopen = function() { + self._connect() + } + + self._ws.onclose = function() { + self.onclose() + } + self._ws.onerror = function() { + self.onerror() + } +} + +// make SOCKS5 inherit from WebSocketStream +// trick from http://ncombo.wordpress.com/2013/07/11/javascript-inheritance-done-right/ +//SOCKS5.prototype = Object.create(WebSocketStream.prototype); + + +SOCKS5.prototype._validate_version = function(b) { + if(b != this.VERSION) { + throw "Unsupported SOCKS version" + } +} + + +// B) connect to the SOCKS server + +SOCKS5.prototype._connect = function() { + var self = this; + return this._negotiate_method() + .then(function(m) { return self._negotiate_auth(m) }) //the wrapping is because then() suffers from changing what 'this' is + .then(function() { return self._negotiate_connection() } ) + .then(function() { return self.onopen({/*XXX fill me in*/}) }) + .fail(function(e) { self.onerror(e) }) //chain exceptions out to the event handler +} + + + +// 1) negotiate an connection method (i.e. an auth method, though encryption and digital signatures are theoretically an option here too); + +SOCKS5.prototype._negotiate_method = function() { + this._ws.send(this._build_method_selection([this.auth.NONE])) + + return this._read_method(); +} + + + +SOCKS5.prototype._build_method_selection = function(methods) { + // precondition: methods is a subset of this.auth + // XXX this precondition isn't enforced! + + var nmethods = methods.length; + + return this.VERSION + build_int(nmethods) + methods.join('') +} + + +SOCKS5.prototype._read_method = function() { + var ws = this._ws; + var self = this; + return ws.recv(1) + .then(function(b) { self._validate_version(b) }) + .then(function() { return ws.recv(1); }) +} + + +// 2) negotiate the authentication; +// the spec says we "MUST" support GSSAPI, which I'm not going +// to do, and "SHOULD" support user/pass, which I am. + + +SOCKS5.prototype._negotiate_auth = function(method) { + var self = this; + // look up a handler for 'method' + // points: this handler may return a promise, but it also might not + // this handler is run with this = [the SOCKS5] + if(method == this.auth.UNACCEPTABLE) { + + //"If the selected METHOD is X'FF', none of the methods listed by the + // client are acceptable, and the client MUST close the connection." + this._ws.close(); //"client MUST close" + + //and we error out too, for good measure + throw "SOCKS server rejected all our auth methods." + } + + function find_method() { + var s = null; + // .find() didn't work. for(k in self.auth) didn't work. + // maybe things are different under Firefox?? + // So I fall back to using a global 's' + Object.keys(self.auth).forEach(function(k) { //XXX this code feels like it probably exists in the stdlib somewhere + if(self.auth[k].charCodeAt(0) == method.charCodeAt(0)) s = k; + }) + + if(s) return s; + throw "Unknown auth method" //Shouldn't happen but not impossible; a conforming server should only respond with a method in the list we sent + } + + var handler = this.authmethods[find_method()] + + // Note! handler might here overwrite this._ws here with a further + // wrapper because: + // > If the negotiated method includes encapsulation [...] + // > these requests MUST be encapsulated in the method- + // > dependent encapsulation. + // - + return handler.call(this) +} + + +SOCKS5.prototype.authmethods = {} +SOCKS5.prototype.authmethods.NONE = function() { + return; +} + +SOCKS5.prototype.authmethods.GSSAPI = function() { + throw "NotImplemented" +} +SOCKS5.prototype.authmethods.LOGIN = function() { + throw "NotImplemented" + //this.send(....) + //return ws.read(loginresponselength).then() .... +} + + +// 3) request the actual tunnel +SOCKS5.prototype._negotiate_connection = function() { + + this._ws.send(this._build_request("CONNECT", this.target)) //hardcoded to "CONNECT"; see the comments near the top + + return this._read_reply(); +} + + +SOCKS5.prototype._build_request = function(command, address) { + command = command.toUpperCase(); + + command = this.commands[command] + if(command === undefined) { + throw "Invalid SOCKS command." + } + + // XXX for now, address is hardcoded as DOMAINAME + // most DNS resolvers should be able to handle text-formatted IPv4 and IPv6 addresses... + var atype = this.atype.DOMAINNAME + + address = split_addr(address) + var host = address[0] + var port = address[1] + + // format address as a fortran-style string + if(host.length > 0xFF) { + throw "Target hostname too long to encode." + } + var n = String.fromCharCode(host.length) + host = n + host + + // and finally, the port + if(port === null) { //XXX maybe this should be inside of spit_addr + throw "Target port must be specified when using SOCKS5." + } + + // "in network octet order" + port = +port; //convert to an integer + port = String.fromCharCode((port & 0xFF00) >> 8) + String.fromCharCode(port & 0xFF) + + + var m = this.VERSION + command + this.RSV + atype + host + port; + return m +} + +SOCKS5.prototype._read_reply = function() { + // As a state machine, this process is: + // [ read version ] -> [ read response ] -> [read reserved null byte] -> [error out] + // |-> [error out] | + // v + // [read address type] + // [read ipv4] [read domainname] [readipv6] + // [read port] + // + // Because the message is a fixed size up to reading the address, I avoiding having to kludge + // around this by just saying .recv(4) and then using standard if statements instead of a chain of .recv(1)s + // but because this step comes after the split, it needs to + + + //the trick here is that promises chain: .then() records the handler you pass it and then returns a new promise which will be fired after that promise completes and finishes the handler + // how do I write branching with promises? + // Promises/A+ makes it easy enough to write a chain of steps and + // get async almost free (the only expense is some repetition: .then().then().then()....) + // MSFT even has an excellent doc on doing this: http://msdn.microsoft.com/en-us/library/windows/apps/Hh700334.aspx + // + // In principle, you should be able to have a promise that represents + // the final result of a branching + // How to express this is escaping me at the moment. + + //NB: the non-lint'd indenting is on purpose here! + // The correct indents would distract, because the .then()s + // are basically boilerplate around the real process. + + var self = this; + var ws = this._ws; + + // check the remote server version + return ws.recv(1) + .then(function(b) { return self._validate_version(b) }) + + // parse the response type + .then(function() { return ws.recv(1) }) + .then(function(b) { + if(b != self.responses.OK) { + throw "SOCKS tunnel refused" + //TODO: give more detailed error message based on what b is + } + }) + + // check that the 'reserved' byte is actually unused; + // if it's not, we might be not talking to SOCKS + .then(function() { return ws.recv(1) }) // NB: Promises/A+ says that you can chain promises: http://promisesaplus.com/#point-49 + .then(function(b) { + if(b != self.RSV ) { + throw "Malformed SOCKS reply" + } + }) + + + // determine the length of the next field, which is "bind.addr", + // telling us what our remote host is + .then(function() { return ws.recv(1) }) + .then(function(b) { + switch(b) { + case self.atype.IPv4: //ipv4: 4 bytes + return ws.recv(4).then(function(addr) { + //TODO: parse the bytes into a IP string + self.remote.host = addr; + }) + break; + case self.atype.DOMAINNAME: // domain name: a fortran-style string (so we need to read 1 byte to find out the length) + return ws.recv(1).then(function(h) { + h = h.charCodeAt(0) //extract the number of bytes to read + ws.recv(h).then(function(addr) { + //the string as given is a string + self.remote.host = addr; + }) + }) + break; + case self.atype.IPv6: //ipv6: 16 bytes + return ws.recv(16).then(function(addr) { + //TODO: parse the octets into a string + self.remote.host = addr; + }) + break; + default: + throw "Received unknown address type"; + } + }) + + // finally, read the port + .then(function() { return ws.recv(2) }) + .then(function(b) { + self.remote.port = b.charCodeAt(0) << 8 | b.charCodeAt(1) + }) + +} + + +// C) Get out of the way: just forward packets +// + +SOCKS5.prototype.recv = function(n) { + // XXX I don't think I'm doing this right + // TODO: ensure that recv() on the outer can be called before the SOCKS negotiation is done + // it should block until + // XXX as written this could interfere with the SOCKS negotiation and totally screw up everything + + return this._ws.recv(n) +} + +SOCKS5.prototype.recvline = function() { + // XXX I don't think I'm doing this right + // TODO: ensure that recv() on the outer can be called before the SOCKS negotiation is done + + return this._ws.recvline() +} + +SOCKS5.prototype.send = function(d) { + return this._ws.send(d) +} + + +SOCKS5.prototype.onopen = function(evt) {} +SOCKS5.prototype.onclose = function(evt) {} +SOCKS5.prototype.onerror = function(evt) {} + +// These constants are hardcoded to correspond to their encoding within the protocol +// SOCKS is simple enough that the constants it uses are all single bytes. +SOCKS5.prototype.VERSION = String.fromCharCode(5) // i.e. SOCKS version 5 +SOCKS5.prototype.RSV = String.fromCharCode(0) //RESERVED byte + +SOCKS5.prototype.auth = { + NONE: String.fromCharCode(0), + GSSAPI: String.fromCharCode(1), + LOGIN: String.fromCharCode(2), + UNACCEPTABLE: String.fromCharCode(0xFF), + // all others are reserved either by IANA or for custom use + // i.e. probably no one uses them(?) + } + + +SOCKS5.prototype.commands = {CONNECT: String.fromCharCode(1), + BIND: String.fromCharCode(2), + UDP: String.fromCharCode(3)} + +SOCKS5.prototype.atype = { + IPv4: String.fromCharCode(1), + DOMAINNAME: String.fromCharCode(3), + IPv6: String.fromCharCode(4) + } + + +SOCKS5.prototype.responses = {OK: String.fromCharCode(0), + //TODO: there's 8 possible errors + } + + +return SOCKS5; +})); \ No newline at end of file diff --git a/scratch/remoting/socks5.ws/test.socks5.js b/scratch/remoting/socks5.ws/test.socks5.js new file mode 100644 index 0000000..c29bf29 --- /dev/null +++ b/scratch/remoting/socks5.ws/test.socks5.js @@ -0,0 +1,46 @@ + +var SOCKS5 = require("./socks5.js") + + + +process.argv.shift()//this removes "/usr/bin/node" and should be a standard part of the startup; it is for python + + +var target = process.argv[1] +if(!target) { + target = "uwaterloo.ca:80" +} + +var prx = new SOCKS5("ws://localhost:8081", target) + +prx.onopen = function() { + console.log("Requesting HTTP from ", target) + prx.send("GET /jam HTTP/1.1\r\n\r\n") + + prx.recv().then(function(e) { + console.log("Response:") + console.log(e); + }) +} + +prx.onerror = function(e) { + console.log("It blew up!") + console.log(e) +} + + +/* +or proxy via + +var prx = new SOCKS5("ws://proxy-host.com:3535", "tor.kousu.ca:22") + +and prx is simply + +// or with our (currently imaginary) SSH client library: +var ssh = new SSH(prx) +ssh.onconnect = function(e) { + ssh.login("user", "pass") +} +ssh.onlogin = function(e* + +*/ diff --git a/scratch/remoting/socks5.ws/test.websocketstream.js b/scratch/remoting/socks5.ws/test.websocketstream.js new file mode 100644 index 0000000..a071817 --- /dev/null +++ b/scratch/remoting/socks5.ws/test.websocketstream.js @@ -0,0 +1,11 @@ + +// run websockify 8081 --unix-target=/tmp/ab & nc -vUl /tmp/ab to construct a server +var WebSocketStream = require("./websocketstream.js") +var ws = new WebSocketStream("ws://localhost:8081") +ws.onclose = function() { console.log("closed") } + +ws.recv(6).then(function(d) { + // NOTE: WebSocketStream by default passes text + // I am unsure if . It is probably up to the websocket library. + console.log("read these 6 characters: ", d) +}) diff --git a/scratch/remoting/socks5.ws/test.websocketstream.socks5.js b/scratch/remoting/socks5.ws/test.websocketstream.socks5.js new file mode 100644 index 0000000..36d7c6a --- /dev/null +++ b/scratch/remoting/socks5.ws/test.websocketstream.socks5.js @@ -0,0 +1,27 @@ + +// run websockify 8081 --unix-target=/tmp/ab & nc -vUl /tmp/ab to construct a server +var WebSocketStream = require("./websocketstream.js") +var ws = new WebSocketStream("ws://localhost:8081") +ws.onclose = function() { console.log("closed") } + +ws.onopen = function() { + +ws.send("\x05\x01\x00") +ws.recv(2).then(function(d) { + // NOTE: WebSocketStream by default passes text + // I am unsure if . It is probably up to the websocket library. + console.log("read these: ", d.charCodeAt(0), d.charCodeAt(1)) +}).then(function() { + ws.send("\x05\x01\x00\x03\x0Cuwaterloo.ca\x00\x50") +}).then(function() { return ws.recv(4) }) +.then(function(d) { + console.log("read these: ", d.charCodeAt(0), d.charCodeAt(1), d.charCodeAt(2), d.charCodeAt(3)) + ws.send("GET / HTTP/1.1\r\n\r\n") +}).then(function() { + return ws.recv() +}).then(function(page) { + console.log(page) +}) + + +} diff --git a/scratch/remoting/socks5.ws/websocketstream.js b/scratch/remoting/socks5.ws/websocketstream.js new file mode 100644 index 0000000..f75ab2d --- /dev/null +++ b/scratch/remoting/socks5.ws/websocketstream.js @@ -0,0 +1,186 @@ +// UMD header +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory(); + } else { + root.WebSocketStream = factory(); + } +}(this, function () { +'use strict'; + +if(!WebSocket) { + var WebSocket = require("ws"); +} +var ayepromise = require("./ayepromise.js") + +/* +this class handles the buffering that WebSocket doesn't +providing the recv() method familiar from synchronous socket code. +Since it returns promises that conform to Promises/A+, the resulting code +is very nearly the same as the equivalent synchronous code. +[[aside: promises are continuation-passing-style indirected]] + +It has the same (XXX not finished) API as WebSocket, except that + .onmessage = handler is replaced by .recv(n, handler) and .recvline(handler) +TODO: is there any way to achieve this with inheritence? can I inherit and somehow zero out onmessage externally while still using onmessage internally? + +.recvline() blocks until '\n' comes in +.recv(n) blocks until n bytes are ready +.recv() buffer until websocket closes +NOTE: only one of these can be active at a time; if you try to call recv() while a recv() is pending you will get an exception + +*/ + +/* TODO: + * + * [ ] do we want perhaps to add a nonblocking flag so that recv() is EITHER blocking or immediate polling? but the only way to deal with nonblocking sockets is polling, and we probably want to discourage polling in javascript... + + * [ ] handle the different modes you can open a websocket in: strings, base64, and binary; + The websocket protocol technically allows intermixing text and binary (and base64 is a websockify addition) frame by frame. Internally, this means we need to our buffer to be tolerate receiving in all three modes. + Sending is not as much of a problem, because we just relay that down to WebSocket itself, and it knows how to handle the differences, though perhaps a base64 mode would help. + + * [x] WebSockify, which I'm testing against (and which this is mostly useful with) insists on me specifying "binary" or "base64" in the protocols argument. + [ ] Make sure upstream accepts my patch + [ ] Support the "protocols" option somehow. +*/ + + +// XXX a complication: +// +// with 'binary' e.data ends up as a Blob which requires a whole slew of JS objects to manipulate: FileReader and ArrayBuffer. FileReader.readAsBinaryString + FileReader.result is the shortest way to just getting the bytes out, however readAsBinaryString has been deprecated for 2 years. +// readAsText assumes the data is UTF-8 (or otherwise specified) text and will decode as approproate +// readAsArrayBuffer just pushes the trouble of manipulation to learning the ArrayBuffer class +// +// For development, I'm going to force base64. I don't know if non-WebSockify WebSocketServers will understand it, but I can cross that bridge later. + +function WebSocketStream(addr) { + var self = this; //so that we can refer to the WebSocketStream from the WebSocket's event handlers + + this._buffer = ""; //TODO: use something more efficient than strings here ( see https://github.com/phoboslab/jsmpeg/blob/master/jsmpg.js#L90 for how ) + + this._pending = null; + + this._ws = new WebSocket(addr, "binary"); + + this._ws.onmessage = function(e) { + self._pushbuffer(e.data); + } + + // these handlers which proxy need to be defined in here + // in order to pick up 'self' and not cause infinite recurision + // saying .e.g this._ws.onopen = this._onopen does the wrong thing, because js doesn't care that this._onopen comes from this, it runs it with this=this._ws + this._ws.onopen = function(e) { + if(self.onopen) { + self.onopen(e) + } + } + this._ws.onclose = function(e) { + // clear out a pending .recv() + if(self._pending !== null) { + if(self._pending.type == self._RECV) { + self._pending.deferred.resolve(self._buffer) + } + } + if(self.onclose) { + self.onclose(e) + } + } + this._ws.onerror = function(e) { + if(self.onerror) { + self.onerror(e) + } + } + + +} + +WebSocketStream.prototype._pushbuffer = function(d) { + var self = this; + + self._buffer += d; + //console.log("[",this._buffer.length,"] buffer = ", self._buffer) //DEBUG + + // if we have a recv() or a recvline() blocked, scan + if(self._pending !== null) { + var split = -1; // if not -1, determines how many bytes to eat and call handler with, and causing the pending recv to be completed + + if(self._pending.type == self._RECVn) { + if(self._buffer.length >= self._pending.n) { + split = self._pending.n; + } + } else if(self._pending.type == self._RECVLINE) { + split = self._buffer.indexOf("\n") + } + + if(split != -1) { + var d = self._buffer.slice(0, split) + self._buffer = self._buffer.slice(split) + + var p = self._pending; + // "complete" the pend; i.e. null out _pending + self._pending = null; //it's important to complete *before* resolving, + //since the resolution handler might--is likely to, even--call recv() again + p.deferred.resolve(d) + } + } +} + + //TODO: .prototoype._recv = { NONE: 0, ..} +WebSocketStream.prototype._RECVNONE = 0 +WebSocketStream.prototype._RECVn = 1 +WebSocketStream.prototype._RECVLINE = 2 +WebSocketStream.prototype._RECV = 3 //XXX NotImplemented + +WebSocketStream.prototype.send = function(data) { + + if(typeof(data) === "string") { + //Bad Things Happen if we let the websocket library send the data, + // namely it UTF-8 encodes it + //XXX this is wrong! the client code should have control here. + // Maybe it really does want to send UTF8?? + var _data = data; + data = new Uint8Array(_data.length); + for(var i = 0; i Date: Mon, 15 Sep 2014 09:15:48 -0400 Subject: [PATCH 40/41] Add SOCKS5.close() --- scratch/remoting/socks5.ws/socks5.js | 3 +++ scratch/remoting/socks5.ws/test.socks5.js | 4 ++++ scratch/remoting/socks5.ws/websocketstream.js | 3 +++ 3 files changed, 10 insertions(+) diff --git a/scratch/remoting/socks5.ws/socks5.js b/scratch/remoting/socks5.ws/socks5.js index bb3d342..e6bff8d 100644 --- a/scratch/remoting/socks5.ws/socks5.js +++ b/scratch/remoting/socks5.ws/socks5.js @@ -364,6 +364,9 @@ SOCKS5.prototype.send = function(d) { return this._ws.send(d) } +SOCKS5.prototype.close = function() { + return this._ws.close(); +} SOCKS5.prototype.onopen = function(evt) {} SOCKS5.prototype.onclose = function(evt) {} diff --git a/scratch/remoting/socks5.ws/test.socks5.js b/scratch/remoting/socks5.ws/test.socks5.js index c29bf29..1ec67d2 100644 --- a/scratch/remoting/socks5.ws/test.socks5.js +++ b/scratch/remoting/socks5.ws/test.socks5.js @@ -29,6 +29,10 @@ prx.onerror = function(e) { } +setTimeout(function() { + console.log("timeout") + prx.close(); +}, 15*1000) /* or proxy via diff --git a/scratch/remoting/socks5.ws/websocketstream.js b/scratch/remoting/socks5.ws/websocketstream.js index f75ab2d..b596b79 100644 --- a/scratch/remoting/socks5.ws/websocketstream.js +++ b/scratch/remoting/socks5.ws/websocketstream.js @@ -181,6 +181,9 @@ WebSocketStream.prototype.recvline = function() { return promise; } +WebSocketStream.prototype.close = function() { + return this._ws.close(); +} return WebSocketStream; })); \ No newline at end of file From f8b06a813a5fab6b70598c71cfc28c4ae441aa03 Mon Sep 17 00:00:00 2001 From: kousu Date: Mon, 15 Sep 2014 09:20:31 -0400 Subject: [PATCH 41/41] Tidy depends --- scratch/remoting/socks5.ws/README.md | 49 +++++ scratch/remoting/socks5.ws/ayepromise.js | 180 ------------------ scratch/remoting/socks5.ws/socks5.js | 7 +- scratch/remoting/socks5.ws/test.socks5.js | 2 +- scratch/remoting/socks5.ws/websocketstream.js | 4 +- 5 files changed, 57 insertions(+), 185 deletions(-) create mode 100644 scratch/remoting/socks5.ws/README.md delete mode 100644 scratch/remoting/socks5.ws/ayepromise.js diff --git a/scratch/remoting/socks5.ws/README.md b/scratch/remoting/socks5.ws/README.md new file mode 100644 index 0000000..9b1eb13 --- /dev/null +++ b/scratch/remoting/socks5.ws/README.md @@ -0,0 +1,49 @@ +SOCKS5 in WebSockets +==================== + + +SOCKS5 is a dead simple little protocol that makes very thin TCP and UDP proxies. +SOCKS is notable because it allows proxying. Since this + +This code implements it on top of WebSockets. If you point [websockify](https://github.com/kanaka/websockify/) at a SOCKS proxy + + +Demo +---- + + +Run a SOCKS server. The quickest is: +``` +$ ssh -D 7777 localhost +``` + +Run websockify in front of that SOCKS server +``` +$ websockify 8081 localhost:7777 +``` + +Run the socks5.js client: +``` +$ node test.socks5.js google.com:80 +``` + +You probably need to `npm install ws` and `npm install ayepromise` first. + +This gets more interesting if instead of sshing in the first step to localhost, you ssh somewhere you have a shell account. +Then the demo script will appear to google (or whoever you hit) to be coming from the system you have a shell account on. + +The same library also works in your browser! + +SOCKS software +-------------- + +Servers: + +* [ssh](http://www.openssh.com/) has a `-D` switch which makes your local machine into a SOCKS proxy, tunneling through to the other end of your ssh session. By default it only allows connections. +* [dante](http://www.inet.no/dante/) is a little more fully featured SOCKS system. +* [tor](http://torproject.org) relies totally on SOCKS to move your traffic off your computer and into the TOR mixnet. + +Clients: + +* Most browsers support SOCKS5. Look under Network Settings in Firefox. +* [tsocks](http://tsocks.sourceforge.net/) which was forked to [torsocks](https://code.google.com/p/torsocks/) which wraps any Unix program through a SOCKS proxy. diff --git a/scratch/remoting/socks5.ws/ayepromise.js b/scratch/remoting/socks5.ws/ayepromise.js deleted file mode 100644 index e245b6b..0000000 --- a/scratch/remoting/socks5.ws/ayepromise.js +++ /dev/null @@ -1,180 +0,0 @@ -// UMD header -(function (root, factory) { - if (typeof define === 'function' && define.amd) { - define(factory); - } else if (typeof exports === 'object') { - module.exports = factory(); - } else { - root.ayepromise = factory(); - } -}(this, function () { - 'use strict'; - - var ayepromise = {}; - - /* Wrap an arbitrary number of functions and allow only one of them to be - executed and only once */ - var once = function () { - var wasCalled = false; - - return function wrapper(wrappedFunction) { - return function () { - if (wasCalled) { - return; - } - wasCalled = true; - wrappedFunction.apply(null, arguments); - }; - }; - }; - - var getThenableIfExists = function (obj) { - // Make sure we only access the accessor once - var then = obj && obj.then; - - if (obj !== null && - typeof obj === "object" && - typeof then === "function") { - - return then.bind(obj); - } - }; - - var doChainCall = function (defer, func, value) { - setTimeout(function () { - var returnValue; - try { - returnValue = func(value); - } catch (e) { - defer.reject(e); - return; - } - - if (returnValue === defer.promise) { - defer.reject(new TypeError('Cannot resolve promise with itself')); - } else { - defer.resolve(returnValue); - } - }, 1); - }; - - var doFulfillCall = function (defer, onFulfilled, value) { - if (onFulfilled && onFulfilled.call) { - doChainCall(defer, onFulfilled, value); - } else { - defer.resolve(value); - } - }; - - var doRejectCall = function (defer, onRejected, value) { - if (onRejected && onRejected.call) { - doChainCall(defer, onRejected, value); - } else { - defer.reject(value); - } - }; - - var aCallChainLink = function (onFulfilled, onRejected) { - var defer = ayepromise.defer(); - return { - promise: defer.promise, - callFulfilled: function (value) { - doFulfillCall(defer, onFulfilled, value); - }, - callRejected: function (value) { - doRejectCall(defer, onRejected, value); - } - }; - }; - - // States - var PENDING = 0, - FULFILLED = 1, - REJECTED = 2; - - ayepromise.defer = function () { - var state = PENDING, - outcome, - callbacks = []; - - var doFulfill = function (value) { - state = FULFILLED; - outcome = value; - - callbacks.forEach(function (link) { - link.callFulfilled(outcome); - }); - }; - - var doReject = function (error) { - state = REJECTED; - outcome = error; - - callbacks.forEach(function (link) { - link.callRejected(outcome); - }); - }; - - var executeResultHandlerDirectlyIfStateNotPendingAnymore = function (link) { - if (state === FULFILLED) { - link.callFulfilled(outcome); - } else if (state === REJECTED) { - link.callRejected(outcome); - } - }; - - var registerResultHandler = function (onFulfilled, onRejected) { - var link = aCallChainLink(onFulfilled, onRejected); - - callbacks.push(link); - - executeResultHandlerDirectlyIfStateNotPendingAnymore(link); - - return link.promise; - }; - - var safelyResolveThenable = function (thenable) { - // Either fulfill, reject or reject with error - var onceWrapper = once(); - try { - thenable( - onceWrapper(transparentlyResolveThenablesAndFulfill), - onceWrapper(doReject) - ); - } catch (e) { - onceWrapper(doReject)(e); - } - }; - - var transparentlyResolveThenablesAndFulfill = function (value) { - var thenable; - - try { - thenable = getThenableIfExists(value); - } catch (e) { - doReject(e); - return; - } - - if (thenable) { - safelyResolveThenable(thenable); - } else { - doFulfill(value); - } - }; - - var onceWrapper = once(); - return { - resolve: onceWrapper(transparentlyResolveThenablesAndFulfill), - reject: onceWrapper(doReject), - promise: { - then: registerResultHandler, - fail: function (onRejected) { - return registerResultHandler(null, onRejected); - } - } - }; - }; - - return ayepromise; -})); diff --git a/scratch/remoting/socks5.ws/socks5.js b/scratch/remoting/socks5.ws/socks5.js index e6bff8d..f3558bc 100644 --- a/scratch/remoting/socks5.ws/socks5.js +++ b/scratch/remoting/socks5.ws/socks5.js @@ -4,7 +4,7 @@ * The missing parts are around the types of authentication: * does NOT support GSSAPI, [and has no means for plugging in new ]. * - * Depends on WebSocketStream (which depends on ayepromise). + * Depends on WebSocketStream (which depends on ayepromise and some WebSocket library). */ /* TODO @@ -32,8 +32,9 @@ }(this, function () { 'use strict'; -var WebSocketStream = require("./websocketstream.js") - +if(!WebSocketStream) { + var WebSocketStream = require("./websocketstream.js") +} /* Utility Routines */ diff --git a/scratch/remoting/socks5.ws/test.socks5.js b/scratch/remoting/socks5.ws/test.socks5.js index 1ec67d2..8410a08 100644 --- a/scratch/remoting/socks5.ws/test.socks5.js +++ b/scratch/remoting/socks5.ws/test.socks5.js @@ -15,7 +15,7 @@ var prx = new SOCKS5("ws://localhost:8081", target) prx.onopen = function() { console.log("Requesting HTTP from ", target) - prx.send("GET /jam HTTP/1.1\r\n\r\n") + prx.send("GET / HTTP/1.1\r\n\r\n") prx.recv().then(function(e) { console.log("Response:") diff --git a/scratch/remoting/socks5.ws/websocketstream.js b/scratch/remoting/socks5.ws/websocketstream.js index b596b79..5011b96 100644 --- a/scratch/remoting/socks5.ws/websocketstream.js +++ b/scratch/remoting/socks5.ws/websocketstream.js @@ -13,7 +13,9 @@ if(!WebSocket) { var WebSocket = require("ws"); } -var ayepromise = require("./ayepromise.js") +if(!ayepromise) { + var ayepromise = require("ayepromise") +} /* this class handles the buffering that WebSocket doesn't