Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4792e0e
Merge pull request #8 from kousu/master
bruzen Aug 2, 2014
a8ad247
Clean up replicate_server.
kousu Sep 4, 2014
3c78c0c
Debugging why unregister() doesn't get called when clients leave.
kousu Sep 4, 2014
d0aed57
Salvage socat's habit of leaving replicate.py's around.
kousu Sep 4, 2014
26da493
Minor SimulationLog safety checks
kousu Sep 4, 2014
d0582f5
minor clarity
kousu Sep 4, 2014
ff01d36
clean up the postgres startup scripts somewhat; 'init.sh' installs th…
kousu Sep 4, 2014
b4a1a06
Switch to proper js inheritence: Constructor.call(this, ...) + mergin…
kousu Sep 4, 2014
31e466e
Add CacheSet sugar: convenience operators as methods
kousu Sep 4, 2014
9c2c9a4
Switch CacheSet -> Table and SubSet -> Where to emulate SQL
kousu Sep 4, 2014
cd3a679
Cleaning up the psql directory, in prep for merging into main
kousu Sep 4, 2014
df4ea19
Move psql into mainline. Now called src/backend/db
kousu Sep 4, 2014
ac77aa4
Spec the target DB for replication on the command line instead of har…
kousu Sep 4, 2014
9ad6e05
Complete the CacheSet->Table naming transition
kousu Sep 4, 2014
52d8e9e
Add Count(), Sum() and Mean()
kousu Sep 4, 2014
c93f63b
Fixed .delete(); I made a booboo adapting to inheritence.
kousu Sep 4, 2014
8bf086f
Add typecasts to the three reduce types to skirt accidents like "[obj…
kousu Sep 4, 2014
43dd2a3
I guess I might as well import crossfilter to go with PourOver.
kousu Sep 4, 2014
b9185a0
Half of the Distinct() operator
kousu Sep 4, 2014
e4d8020
move auto-sqlit'ing inside of SimulationLog
kousu Sep 4, 2014
12a44e2
buffering experiments, since I want to get IPC working.
kousu Sep 5, 2014
619d2ce
Typo; make the backend socket actually use $TABLE not $table in its name
kousu Sep 6, 2014
0cadb45
Fix my use of 'trap" so that replicate_server kills its children smoo…
kousu Sep 6, 2014
a6c629e
*undo* my forced-cd in thre postgres client.sh
kousu Sep 6, 2014
b0cc7ca
Clean up replicate.py's shutdown conditions
kousu Sep 6, 2014
1c5ad75
switch to just using a global socket for shutdown signalling
kousu Sep 6, 2014
9c97737
Rearrange for readability
kousu Sep 6, 2014
cedcd08
Finally adapt replicate() to match local indentation
kousu Sep 6, 2014
0492346
Make sure `pkill replicate.py` also takes down the socket
kousu Sep 6, 2014
b2c43c6
Defensively unregister clients that go missing
kousu Sep 6, 2014
66f8bbe
Clean up the trigger in unregister()
kousu Sep 6, 2014
2dc6bec
Fall in with the postgres environment variables
kousu Sep 7, 2014
70c1b6b
Update db/ documentation to actually be usable.
kousu Sep 7, 2014
ce3061d
Merge branch 'master' into databased
kousu Sep 7, 2014
775edd3
Table.and() operator.
kousu Sep 10, 2014
66c34d0
More tests (warning: whitebox tests!)
kousu Sep 10, 2014
80b081f
refactor the ugliness that was And() with liberal iterator use.
kousu Sep 11, 2014
533676b
Not operator.
kousu Sep 11, 2014
c036f18
Or() operator
kousu Sep 12, 2014
d2be539
TODOs
kousu Sep 12, 2014
f58c623
SOCKS5 over WebSockets.
kousu Sep 15, 2014
797ea20
Add SOCKS5.close()
kousu Sep 15, 2014
f8b06a8
Tidy depends
kousu Sep 15, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,402 changes: 1,402 additions & 0 deletions assets/libs/crossfilter.js

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions scratch/pipes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Experimenting with constructing pipelines with subprocess.py

The one big win that shell has over python is that doing "./spool | filter | reduce" is really easy and really efficient: as efficient as the individual programs
But the subprocess module is finicky about doing this sort of thing.

[the docs](https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline) claim you can build pipelines like this:
```
output=`dmesg | grep hda`
# becomes
p1 = Popen(["dmesg"], stdout=PIPE)
p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]
```

But that is misleading: `communicate()` buffers the input between the two processes.

`spool.py` + `s2.py` demonstrates that to get things running in real time, the thing is for the producer
(`spool.py` or `dmesg` or whatever is on the left side of the pipe) to call `flush()` to get things moving.
Yet, running `spool.py` on a terminal gets output immediately.
Why is this? The pipe constructed by python by default has a buffer size of io.DEFAULT_BUFFER_SIZE(==8192).
Frustratingly, setting the 'bufsize' argument to 0 does *not* automatically give unbuffered .stdout, but it does set .stdout to be what would otherwise be .stdout.raw
There was [a patch](http://bugs.python.org/issue11459) to 3.1 and 3.2 that should have fixed it
But you need also the writer to not be doing its own buffering (just because your side of a pipe is unbuffered doesn't mean the other side is)
e.g. [see](http://chase-seibert.github.io/blog/2012/11/16/python-subprocess-asynchronous-read-stdout.html): """you need to make sure that the subprocess you are invoking is not doing its own buffering. It took me a bit to figure out that mysql does do that, which is what the --unbuffered flag is there to disable.""" and [this](http://stackoverflow.com/questions/107705/python-output-buffering)


* [Related technical links](http://bugs.python.org/issue19929) (hidden in a bug report)
* http://objectmix.com/python/383415-working-around-buffering-issues-when-writing-pipes.html

By experiment, the buffer size on my computer seems to be somewhere around 10000000>>4 bytes.
Link above suggests it should be 65536 bytes.

[some systems](http://www.gnu.org/software/libc/manual/html_node/Controlling-Buffering.html) has a way to control buffering on an already open file descriptor.


10 changes: 10 additions & 0 deletions scratch/pipes/s2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env python3

import subprocess
import time

import select

with open("spool.err","wb",buffering=0) as log:
p1 = subprocess.Popen(["python", "-u", "./spool.py"], bufsize=0, stdout=subprocess.PIPE, stderr=log)
import IPython; IPython.embed()
42 changes: 42 additions & 0 deletions scratch/pipes/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3

import os
import sys
from subprocess import Popen, PIPE

def produce(to_sed):
for i in ('beet', 'meet', 'sneet'):
to_sed.write(i + '\n')
to_sed.flush()

def consume(from_sed):
print "CONSUME"; sys.stdout.flush()
while 1:
print "TRYING TO READ from_sed"; sys.stdout.flush()
res = from_sed.readline()
print "READ from_sed"; sys.stdout.flush()
if not res:
sys.exit(0)
#sys.exit(proc.poll())
print 'received: ', [res]


def main():
#proc = ['sed', 's/ee/oo/g' ]
proc = ["./spool.py"]
log = open("spool.err","wb",buffering=0)
proc = Popen(proc,stdin=PIPE,stdout=PIPE,stderr=log)
to_sed = proc.stdin
from_sed = proc.stdout

pid = os.fork()
if pid == 0:
from_sed.close()
produce(to_sed)
return
else:
to_sed.close()
consume(from_sed)

if __name__ == '__main__':
main()
100 changes: 100 additions & 0 deletions scratch/pipes/s4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#! /bin/env python

import os
import sys
import signal
from subprocess import Popen, PIPE
import multiprocessing
from Queue import Empty

def produce(to_processor,pending):
while 1:
try:
item = pending.get(False)
except Empty:
item = ''
if item is produce:
#that's the signal to stop!
to_processor.close()
return
#must keep filling the buffer with something ('\n')
to_processor.write(item + '\n')
to_processor.flush()

def consume(from_processor,done):
while 1:
res = from_processor.readline()
print("READ THIS:", res)
if not res:
from_processor.close()
done.put(consume)
return
done.put(res)

def controller(pending, done):
for i in ('beet', 'meet', 'sneet'):
pending.put(i)
needed = 3
quantity_done = 0
while 1:
item = done.get()
if item and item != '\n':
if item.startswith('b'):
pending.put('Z' + item)
needed += 1
quantity_done +=1
print item,
if quantity_done == needed:
pending.put(produce)
break
while 1:
item = done.get()
if item is consume:
return

def main():
r'''
workflow:
producer -> processor -> consumer
^ /
\ v
pending done
^ /
\ v
controller
components:

controller: the original script
producer: a forked clone of controller
consumer: a forked clone of controller
processor: a subprocess.popen instance
pending: a multiprocessing queue
done: a multiprocessing queue
'''
#proc = ['sed', 's/ee/oo/g' ]
proc = ["./spool.py"]
proc = Popen(proc,stdin=PIPE,stdout=PIPE)
to_processor, from_processor = proc.stdin, proc.stdout

pending = multiprocessing.Queue()
pid = os.fork()
if pid == 0:
from_processor.close()
produce(to_processor,pending)
return
done = multiprocessing.Queue()
pid2 = os.fork()
if pid2 == 0:
to_processor.close()
consume(from_processor,done)
return
to_processor.close()
from_processor.close()
res = controller(pending, done)
os.waitpid(pid,0)
os.waitpid(pid2,0)
return res


if __name__ == '__main__':
main()
10 changes: 10 additions & 0 deletions scratch/pipes/spool.err
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
spooled #0
spooled #1
spooled #2
spooled #3
spooled #4
spooled #5
Traceback (most recent call last):
File "./spool.py", line 13, in <module>
print("[%d] %d" % (i, f))
BrokenPipeError: [Errno 32] Broken pipe
18 changes: 18 additions & 0 deletions scratch/pipes/spool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3

import time, sys

def fib():
a,b =1,1
while True:
yield a
a,b = b, a+b

if __name__ == '__main__':
for i,f in enumerate(fib()):
print("[%d] %d" % (i, f))

#sys.stdout.flush()
print("spooled #%d" % i, file=sys.stderr, flush=True)
time.sleep(2)

19 changes: 19 additions & 0 deletions scratch/pipes/subprocess.pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# from http://stackoverflow.com/questions/1595492/blocks-send-input-to-python-subprocess-pipeline
from subprocess import Popen, PIPE, STDOUT
import sys, time

#p1 = Popen(["grep", "-v", "notf"], stdin=PIPE, stdout=PIPE, close_fds=True)
p1 = Popen(["./spool.py"], stdout=PIPE, close_fds=True)
#print (p1.stdin, sys.stdin)
#p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
#p1.stdin.write(b'Hello World\n')
#p1.stdin.write(b"not this one\n");
#p1.stdin.flush()
#sys.stderr.write("now look in your process list for grep and cut\n")
#time.sleep(2)
#p1.stdin.close() #If I do not close, what happens?
sys.stderr.write("attempting to read from the end of the pipeline\n")
result = p1.stdout.raw.read(5)
sys.stderr.write("we read:n")
sys.stderr.write(result+"\n");
assert result == "Hello Worl\n"
68 changes: 0 additions & 68 deletions scratch/psql/README.md

This file was deleted.

File renamed without changes.
1 change: 1 addition & 0 deletions scratch/psql/attic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is the cruft code generated in writing postgres realtime replication, which now lives in src/backend/db
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
71 changes: 71 additions & 0 deletions scratch/psql/attic/experiments/forkit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@

# socat with the "fork" option holds open programs even after they've died
# maybe I need to check is sys.stdout exists????

import sys
import uuid

import threading, time
import atexit

I = uuid.uuid4()
i = 0

def alive():
global i
while True:
#a = input()
#print(a*2)
sys.stderr.write(str(I)+":"+str(i))
sys.stderr.write("\n")
sys.stderr.flush()
time.sleep(2)
i += 1

T = threading.Thread(target=alive);
T.start()

def q():
print("quittign")
sys.stderr.write("stderr::quitting\n"); sys.stderr.flush()
atexit.register(q)

# used with socat's EXEC address, e.g.
# socat UNIX-LISTEN:/tmp/sw,fork,reuseaddr EXEC:"python forkit.py"
# socat TCP-LISTEN:7777,fork,reuseaddr EXEC:"python forkit.py"
# and then connected to
# this is indeed shutdown properly by socat when the client disconnects.
# However, for some reason replicate.py doesn't shutdown.
#
# I know what's the problem:
# this is being hard-killed (-9'd, or something), and so the process simply dies without having a change to clean up after itself.
# The way I've written the code, it only calls Changes.__exit__() (--> so therefore calls unregister())
# if an exception happens *within* replicate--and not just that,
# but within the thread doing the spooling.
#
# ...so how do I fix this?
# I could trap the exit signal maybe, but that seems like a patchwork solution and won't run on the right thread
# I could have a monitor thread watching (via select(), even) sys.stdin or sys.stdout and looking for EOF --- I've got this written and it reliably catches the quit--and then raise a signal to trigger the main loop to quit
# ^ this is flakey, because there's no guarantee this thread will run between the socket closing and the kill coming
# I could restructure the code so that the spooling loop select()s on *both* the input (unix domain socket from inside of postgres) and output (stdout) streams. Then, when
# WORKING BUT AWKWARD SOLUTION: set end-close on the EXEC in socat; this makes socat forgo the; then, do the restructuring to watch for stdin falling over.
# CURRENTLY AWKWARD because I've written
# for delta in replicate(table):
# if select.select([sys.stdin], [], [],0)[0]:
# so replicants don't find out they should die until the next time a delta comes from the database. This isn't the end of the world, but it certainly provides a way (for an attacker?) to chew up resources if we also allow clients to indirectly control writes to the DB: open and close the client page a million times; this will spawn a million replicates which won't ever close
# I think the only other ways to detect the client dying are
# - rewrite as a socket app and check explicitly
# - use signal
# the monitoring thread is essentially no better than restructuring the code; the code will *still* have to be
# # Oh! If it is important that we quit, then make the main thread the one monitoring stdin
# and put
# hm but really
# ..oh, except.. well... we're using datagrams on the input side so we don't have...

# This is additionally made extra complicated by replicate.sh, since processes do not take their children with them by default
# --> it would be better if replicate.sh could be avoided -- all it does is set an environment var, and then only on OS X

while True:
time.sleep(1)


File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
13 changes: 0 additions & 13 deletions scratch/psql/client.sh

This file was deleted.

Loading