diff --git a/assets/libs/crossfilter.js b/assets/libs/crossfilter.js new file mode 100644 index 0000000..2beae8b --- /dev/null +++ b/assets/libs/crossfilter.js @@ -0,0 +1,1402 @@ +(function(exports){ +crossfilter.version = "1.3.9"; +function crossfilter_identity(d) { + return d; +} +crossfilter.permute = permute; + +function permute(array, index) { + for (var i = 0, n = index.length, copy = new Array(n); i < n; ++i) { + copy[i] = array[index[i]]; + } + return copy; +} +var bisect = crossfilter.bisect = bisect_by(crossfilter_identity); + +bisect.by = bisect_by; + +function bisect_by(f) { + + // Locate the insertion point for x in a to maintain sorted order. The + // arguments lo and hi may be used to specify a subset of the array which + // should be considered; by default the entire array is used. If x is already + // present in a, the insertion point will be before (to the left of) any + // existing entries. The return value is suitable for use as the first + // argument to `array.splice` assuming that a is already sorted. + // + // The returned insertion point i partitions the array a into two halves so + // that all v < x for v in a[lo:i] for the left side and all v >= x for v in + // a[i:hi] for the right side. + function bisectLeft(a, x, lo, hi) { + while (lo < hi) { + var mid = lo + hi >>> 1; + if (f(a[mid]) < x) lo = mid + 1; + else hi = mid; + } + return lo; + } + + // Similar to bisectLeft, but returns an insertion point which comes after (to + // the right of) any existing entries of x in a. + // + // The returned insertion point i partitions the array into two halves so that + // all v <= x for v in a[lo:i] for the left side and all v > x for v in + // a[i:hi] for the right side. + function bisectRight(a, x, lo, hi) { + while (lo < hi) { + var mid = lo + hi >>> 1; + if (x < f(a[mid])) hi = mid; + else lo = mid + 1; + } + return lo; + } + + bisectRight.right = bisectRight; + bisectRight.left = bisectLeft; + return bisectRight; +} +var heap = crossfilter.heap = heap_by(crossfilter_identity); + +heap.by = heap_by; + +function heap_by(f) { + + // Builds a binary heap within the specified array a[lo:hi]. The heap has the + // property such that the parent a[lo+i] is always less than or equal to its + // two children: a[lo+2*i+1] and a[lo+2*i+2]. + function heap(a, lo, hi) { + var n = hi - lo, + i = (n >>> 1) + 1; + while (--i > 0) sift(a, i, n, lo); + return a; + } + + // Sorts the specified array a[lo:hi] in descending order, assuming it is + // already a heap. + function sort(a, lo, hi) { + var n = hi - lo, + t; + while (--n > 0) t = a[lo], a[lo] = a[lo + n], a[lo + n] = t, sift(a, 1, n, lo); + return a; + } + + // Sifts the element a[lo+i-1] down the heap, where the heap is the contiguous + // slice of array a[lo:lo+n]. This method can also be used to update the heap + // incrementally, without incurring the full cost of reconstructing the heap. + function sift(a, i, n, lo) { + var d = a[--lo + i], + x = f(d), + child; + while ((child = i << 1) <= n) { + if (child < n && f(a[lo + child]) > f(a[lo + child + 1])) child++; + if (x <= f(a[lo + child])) break; + a[lo + i] = a[lo + child]; + i = child; + } + a[lo + i] = d; + } + + heap.sort = sort; + return heap; +} +var heapselect = crossfilter.heapselect = heapselect_by(crossfilter_identity); + +heapselect.by = heapselect_by; + +function heapselect_by(f) { + var heap = heap_by(f); + + // Returns a new array containing the top k elements in the array a[lo:hi]. + // The returned array is not sorted, but maintains the heap property. If k is + // greater than hi - lo, then fewer than k elements will be returned. The + // order of elements in a is unchanged by this operation. + function heapselect(a, lo, hi, k) { + var queue = new Array(k = Math.min(hi - lo, k)), + min, + i, + x, + d; + + for (i = 0; i < k; ++i) queue[i] = a[lo++]; + heap(queue, 0, k); + + if (lo < hi) { + min = f(queue[0]); + do { + if (x = f(d = a[lo]) > min) { + queue[0] = d; + min = f(heap(queue, 0, k)[0]); + } + } while (++lo < hi); + } + + return queue; + } + + return heapselect; +} +var insertionsort = crossfilter.insertionsort = insertionsort_by(crossfilter_identity); + +insertionsort.by = insertionsort_by; + +function insertionsort_by(f) { + + function insertionsort(a, lo, hi) { + for (var i = lo + 1; i < hi; ++i) { + for (var j = i, t = a[i], x = f(t); j > lo && f(a[j - 1]) > x; --j) { + a[j] = a[j - 1]; + } + a[j] = t; + } + return a; + } + + return insertionsort; +} +// Algorithm designed by Vladimir Yaroslavskiy. +// Implementation based on the Dart project; see lib/dart/LICENSE for details. + +var quicksort = crossfilter.quicksort = quicksort_by(crossfilter_identity); + +quicksort.by = quicksort_by; + +function quicksort_by(f) { + var insertionsort = insertionsort_by(f); + + function sort(a, lo, hi) { + return (hi - lo < quicksort_sizeThreshold + ? insertionsort + : quicksort)(a, lo, hi); + } + + function quicksort(a, lo, hi) { + // Compute the two pivots by looking at 5 elements. + var sixth = (hi - lo) / 6 | 0, + i1 = lo + sixth, + i5 = hi - 1 - sixth, + i3 = lo + hi - 1 >> 1, // The midpoint. + i2 = i3 - sixth, + i4 = i3 + sixth; + + var e1 = a[i1], x1 = f(e1), + e2 = a[i2], x2 = f(e2), + e3 = a[i3], x3 = f(e3), + e4 = a[i4], x4 = f(e4), + e5 = a[i5], x5 = f(e5); + + var t; + + // Sort the selected 5 elements using a sorting network. + if (x1 > x2) t = e1, e1 = e2, e2 = t, t = x1, x1 = x2, x2 = t; + if (x4 > x5) t = e4, e4 = e5, e5 = t, t = x4, x4 = x5, x5 = t; + if (x1 > x3) t = e1, e1 = e3, e3 = t, t = x1, x1 = x3, x3 = t; + if (x2 > x3) t = e2, e2 = e3, e3 = t, t = x2, x2 = x3, x3 = t; + if (x1 > x4) t = e1, e1 = e4, e4 = t, t = x1, x1 = x4, x4 = t; + if (x3 > x4) t = e3, e3 = e4, e4 = t, t = x3, x3 = x4, x4 = t; + if (x2 > x5) t = e2, e2 = e5, e5 = t, t = x2, x2 = x5, x5 = t; + if (x2 > x3) t = e2, e2 = e3, e3 = t, t = x2, x2 = x3, x3 = t; + if (x4 > x5) t = e4, e4 = e5, e5 = t, t = x4, x4 = x5, x5 = t; + + var pivot1 = e2, pivotValue1 = x2, + pivot2 = e4, pivotValue2 = x4; + + // e2 and e4 have been saved in the pivot variables. They will be written + // back, once the partitioning is finished. + a[i1] = e1; + a[i2] = a[lo]; + a[i3] = e3; + a[i4] = a[hi - 1]; + a[i5] = e5; + + var less = lo + 1, // First element in the middle partition. + great = hi - 2; // Last element in the middle partition. + + // Note that for value comparison, <, <=, >= and > coerce to a primitive via + // Object.prototype.valueOf; == and === do not, so in order to be consistent + // with natural order (such as for Date objects), we must do two compares. + var pivotsEqual = pivotValue1 <= pivotValue2 && pivotValue1 >= pivotValue2; + if (pivotsEqual) { + + // Degenerated case where the partitioning becomes a dutch national flag + // problem. + // + // [ | < pivot | == pivot | unpartitioned | > pivot | ] + // ^ ^ ^ ^ ^ + // left less k great right + // + // a[left] and a[right] are undefined and are filled after the + // partitioning. + // + // Invariants: + // 1) for x in ]left, less[ : x < pivot. + // 2) for x in [less, k[ : x == pivot. + // 3) for x in ]great, right[ : x > pivot. + for (var k = less; k <= great; ++k) { + var ek = a[k], xk = f(ek); + if (xk < pivotValue1) { + if (k !== less) { + a[k] = a[less]; + a[less] = ek; + } + ++less; + } else if (xk > pivotValue1) { + + // Find the first element <= pivot in the range [k - 1, great] and + // put [:ek:] there. We know that such an element must exist: + // When k == less, then el3 (which is equal to pivot) lies in the + // interval. Otherwise a[k - 1] == pivot and the search stops at k-1. + // Note that in the latter case invariant 2 will be violated for a + // short amount of time. The invariant will be restored when the + // pivots are put into their final positions. + while (true) { + var greatValue = f(a[great]); + if (greatValue > pivotValue1) { + great--; + // This is the only location in the while-loop where a new + // iteration is started. + continue; + } else if (greatValue < pivotValue1) { + // Triple exchange. + a[k] = a[less]; + a[less++] = a[great]; + a[great--] = ek; + break; + } else { + a[k] = a[great]; + a[great--] = ek; + // Note: if great < k then we will exit the outer loop and fix + // invariant 2 (which we just violated). + break; + } + } + } + } + } else { + + // We partition the list into three parts: + // 1. < pivot1 + // 2. >= pivot1 && <= pivot2 + // 3. > pivot2 + // + // During the loop we have: + // [ | < pivot1 | >= pivot1 && <= pivot2 | unpartitioned | > pivot2 | ] + // ^ ^ ^ ^ ^ + // left less k great right + // + // a[left] and a[right] are undefined and are filled after the + // partitioning. + // + // Invariants: + // 1. for x in ]left, less[ : x < pivot1 + // 2. for x in [less, k[ : pivot1 <= x && x <= pivot2 + // 3. for x in ]great, right[ : x > pivot2 + for (var k = less; k <= great; k++) { + var ek = a[k], xk = f(ek); + if (xk < pivotValue1) { + if (k !== less) { + a[k] = a[less]; + a[less] = ek; + } + ++less; + } else { + if (xk > pivotValue2) { + while (true) { + var greatValue = f(a[great]); + if (greatValue > pivotValue2) { + great--; + if (great < k) break; + // This is the only location inside the loop where a new + // iteration is started. + continue; + } else { + // a[great] <= pivot2. + if (greatValue < pivotValue1) { + // Triple exchange. + a[k] = a[less]; + a[less++] = a[great]; + a[great--] = ek; + } else { + // a[great] >= pivot1. + a[k] = a[great]; + a[great--] = ek; + } + break; + } + } + } + } + } + } + + // Move pivots into their final positions. + // We shrunk the list from both sides (a[left] and a[right] have + // meaningless values in them) and now we move elements from the first + // and third partition into these locations so that we can store the + // pivots. + a[lo] = a[less - 1]; + a[less - 1] = pivot1; + a[hi - 1] = a[great + 1]; + a[great + 1] = pivot2; + + // The list is now partitioned into three partitions: + // [ < pivot1 | >= pivot1 && <= pivot2 | > pivot2 ] + // ^ ^ ^ ^ + // left less great right + + // Recursive descent. (Don't include the pivot values.) + sort(a, lo, less - 1); + sort(a, great + 2, hi); + + if (pivotsEqual) { + // All elements in the second partition are equal to the pivot. No + // need to sort them. + return a; + } + + // In theory it should be enough to call _doSort recursively on the second + // partition. + // The Android source however removes the pivot elements from the recursive + // call if the second partition is too large (more than 2/3 of the list). + if (less < i1 && great > i5) { + var lessValue, greatValue; + while ((lessValue = f(a[less])) <= pivotValue1 && lessValue >= pivotValue1) ++less; + while ((greatValue = f(a[great])) <= pivotValue2 && greatValue >= pivotValue2) --great; + + // Copy paste of the previous 3-way partitioning with adaptions. + // + // We partition the list into three parts: + // 1. == pivot1 + // 2. > pivot1 && < pivot2 + // 3. == pivot2 + // + // During the loop we have: + // [ == pivot1 | > pivot1 && < pivot2 | unpartitioned | == pivot2 ] + // ^ ^ ^ + // less k great + // + // Invariants: + // 1. for x in [ *, less[ : x == pivot1 + // 2. for x in [less, k[ : pivot1 < x && x < pivot2 + // 3. for x in ]great, * ] : x == pivot2 + for (var k = less; k <= great; k++) { + var ek = a[k], xk = f(ek); + if (xk <= pivotValue1 && xk >= pivotValue1) { + if (k !== less) { + a[k] = a[less]; + a[less] = ek; + } + less++; + } else { + if (xk <= pivotValue2 && xk >= pivotValue2) { + while (true) { + var greatValue = f(a[great]); + if (greatValue <= pivotValue2 && greatValue >= pivotValue2) { + great--; + if (great < k) break; + // This is the only location inside the loop where a new + // iteration is started. + continue; + } else { + // a[great] < pivot2. + if (greatValue < pivotValue1) { + // Triple exchange. + a[k] = a[less]; + a[less++] = a[great]; + a[great--] = ek; + } else { + // a[great] == pivot1. + a[k] = a[great]; + a[great--] = ek; + } + break; + } + } + } + } + } + } + + // The second partition has now been cleared of pivot elements and looks + // as follows: + // [ * | > pivot1 && < pivot2 | * ] + // ^ ^ + // less great + // Sort the second partition using recursive descent. + + // The second partition looks as follows: + // [ * | >= pivot1 && <= pivot2 | * ] + // ^ ^ + // less great + // Simply sort it by recursive descent. + + return sort(a, less, great + 1); + } + + return sort; +} + +var quicksort_sizeThreshold = 32; +var crossfilter_array8 = crossfilter_arrayUntyped, + crossfilter_array16 = crossfilter_arrayUntyped, + crossfilter_array32 = crossfilter_arrayUntyped, + crossfilter_arrayLengthen = crossfilter_arrayLengthenUntyped, + crossfilter_arrayWiden = crossfilter_arrayWidenUntyped; + +if (typeof Uint8Array !== "undefined") { + crossfilter_array8 = function(n) { return new Uint8Array(n); }; + crossfilter_array16 = function(n) { return new Uint16Array(n); }; + crossfilter_array32 = function(n) { return new Uint32Array(n); }; + + crossfilter_arrayLengthen = function(array, length) { + if (array.length >= length) return array; + var copy = new array.constructor(length); + copy.set(array); + return copy; + }; + + crossfilter_arrayWiden = function(array, width) { + var copy; + switch (width) { + case 16: copy = crossfilter_array16(array.length); break; + case 32: copy = crossfilter_array32(array.length); break; + default: throw new Error("invalid array width!"); + } + copy.set(array); + return copy; + }; +} + +function crossfilter_arrayUntyped(n) { + var array = new Array(n), i = -1; + while (++i < n) array[i] = 0; + return array; +} + +function crossfilter_arrayLengthenUntyped(array, length) { + var n = array.length; + while (n < length) array[n++] = 0; + return array; +} + +function crossfilter_arrayWidenUntyped(array, width) { + if (width > 32) throw new Error("invalid array width!"); + return array; +} +function crossfilter_filterExact(bisect, value) { + return function(values) { + var n = values.length; + return [bisect.left(values, value, 0, n), bisect.right(values, value, 0, n)]; + }; +} + +function crossfilter_filterRange(bisect, range) { + var min = range[0], + max = range[1]; + return function(values) { + var n = values.length; + return [bisect.left(values, min, 0, n), bisect.left(values, max, 0, n)]; + }; +} + +function crossfilter_filterAll(values) { + return [0, values.length]; +} +function crossfilter_null() { + return null; +} +function crossfilter_zero() { + return 0; +} +function crossfilter_reduceIncrement(p) { + return p + 1; +} + +function crossfilter_reduceDecrement(p) { + return p - 1; +} + +function crossfilter_reduceAdd(f) { + return function(p, v) { + return p + +f(v); + }; +} + +function crossfilter_reduceSubtract(f) { + return function(p, v) { + return p - f(v); + }; +} +exports.crossfilter = crossfilter; + +function crossfilter() { + var crossfilter = { + add: add, + remove: removeData, + dimension: dimension, + groupAll: groupAll, + size: size + }; + + var data = [], // the records + n = 0, // the number of records; data.length + m = 0, // a bit mask representing which dimensions are in use + M = 8, // number of dimensions that can fit in `filters` + filters = crossfilter_array8(0), // M bits per record; 1 is filtered out + filterListeners = [], // when the filters change + dataListeners = [], // when data is added + removeDataListeners = []; // when data is removed + + // Adds the specified new records to this crossfilter. + function add(newData) { + var n0 = n, + n1 = newData.length; + + // If there's actually new data to add… + // Merge the new data into the existing data. + // Lengthen the filter bitset to handle the new records. + // Notify listeners (dimensions and groups) that new data is available. + if (n1) { + data = data.concat(newData); + filters = crossfilter_arrayLengthen(filters, n += n1); + dataListeners.forEach(function(l) { l(newData, n0, n1); }); + } + + return crossfilter; + } + + // Removes all records that match the current filters. + function removeData() { + var newIndex = crossfilter_index(n, n), + removed = []; + for (var i = 0, j = 0; i < n; ++i) { + if (filters[i]) newIndex[i] = j++; + else removed.push(i); + } + + // Remove all matching records from groups. + filterListeners.forEach(function(l) { l(0, [], removed); }); + + // Update indexes. + removeDataListeners.forEach(function(l) { l(newIndex); }); + + // Remove old filters and data by overwriting. + for (var i = 0, j = 0, k; i < n; ++i) { + if (k = filters[i]) { + if (i !== j) filters[j] = k, data[j] = data[i]; + ++j; + } + } + data.length = j; + while (n > j) filters[--n] = 0; + } + + // Adds a new dimension with the specified value accessor function. + function dimension(value) { + var dimension = { + filter: filter, + filterExact: filterExact, + filterRange: filterRange, + filterFunction: filterFunction, + filterAll: filterAll, + top: top, + bottom: bottom, + group: group, + groupAll: groupAll, + dispose: dispose, + remove: dispose // for backwards-compatibility + }; + + var one = ~m & -~m, // lowest unset bit as mask, e.g., 00001000 + zero = ~one, // inverted one, e.g., 11110111 + values, // sorted, cached array + index, // value rank ↦ object id + newValues, // temporary array storing newly-added values + newIndex, // temporary array storing newly-added index + sort = quicksort_by(function(i) { return newValues[i]; }), + refilter = crossfilter_filterAll, // for recomputing filter + refilterFunction, // the custom filter function in use + indexListeners = [], // when data is added + dimensionGroups = [], + lo0 = 0, + hi0 = 0; + + // Updating a dimension is a two-stage process. First, we must update the + // associated filters for the newly-added records. Once all dimensions have + // updated their filters, the groups are notified to update. + dataListeners.unshift(preAdd); + dataListeners.push(postAdd); + + removeDataListeners.push(removeData); + + // Incorporate any existing data into this dimension, and make sure that the + // filter bitset is wide enough to handle the new dimension. + m |= one; + if (M >= 32 ? !one : m & (1 << M) - 1) { + filters = crossfilter_arrayWiden(filters, M <<= 1); + } + preAdd(data, 0, n); + postAdd(data, 0, n); + + // Incorporates the specified new records into this dimension. + // This function is responsible for updating filters, values, and index. + function preAdd(newData, n0, n1) { + + // Permute new values into natural order using a sorted index. + newValues = newData.map(value); + newIndex = sort(crossfilter_range(n1), 0, n1); + newValues = permute(newValues, newIndex); + + // Bisect newValues to determine which new records are selected. + var bounds = refilter(newValues), lo1 = bounds[0], hi1 = bounds[1], i; + if (refilterFunction) { + for (i = 0; i < n1; ++i) { + if (!refilterFunction(newValues[i], i)) filters[newIndex[i] + n0] |= one; + } + } else { + for (i = 0; i < lo1; ++i) filters[newIndex[i] + n0] |= one; + for (i = hi1; i < n1; ++i) filters[newIndex[i] + n0] |= one; + } + + // If this dimension previously had no data, then we don't need to do the + // more expensive merge operation; use the new values and index as-is. + if (!n0) { + values = newValues; + index = newIndex; + lo0 = lo1; + hi0 = hi1; + return; + } + + var oldValues = values, + oldIndex = index, + i0 = 0, + i1 = 0; + + // Otherwise, create new arrays into which to merge new and old. + values = new Array(n); + index = crossfilter_index(n, n); + + // Merge the old and new sorted values, and old and new index. + for (i = 0; i0 < n0 && i1 < n1; ++i) { + if (oldValues[i0] < newValues[i1]) { + values[i] = oldValues[i0]; + index[i] = oldIndex[i0++]; + } else { + values[i] = newValues[i1]; + index[i] = newIndex[i1++] + n0; + } + } + + // Add any remaining old values. + for (; i0 < n0; ++i0, ++i) { + values[i] = oldValues[i0]; + index[i] = oldIndex[i0]; + } + + // Add any remaining new values. + for (; i1 < n1; ++i1, ++i) { + values[i] = newValues[i1]; + index[i] = newIndex[i1] + n0; + } + + // Bisect again to recompute lo0 and hi0. + bounds = refilter(values), lo0 = bounds[0], hi0 = bounds[1]; + } + + // When all filters have updated, notify index listeners of the new values. + function postAdd(newData, n0, n1) { + indexListeners.forEach(function(l) { l(newValues, newIndex, n0, n1); }); + newValues = newIndex = null; + } + + function removeData(reIndex) { + for (var i = 0, j = 0, k; i < n; ++i) { + if (filters[k = index[i]]) { + if (i !== j) values[j] = values[i]; + index[j] = reIndex[k]; + ++j; + } + } + values.length = j; + while (j < n) index[j++] = 0; + + // Bisect again to recompute lo0 and hi0. + var bounds = refilter(values); + lo0 = bounds[0], hi0 = bounds[1]; + } + + // Updates the selected values based on the specified bounds [lo, hi]. + // This implementation is used by all the public filter methods. + function filterIndexBounds(bounds) { + var lo1 = bounds[0], + hi1 = bounds[1]; + + if (refilterFunction) { + refilterFunction = null; + filterIndexFunction(function(d, i) { return lo1 <= i && i < hi1; }); + lo0 = lo1; + hi0 = hi1; + return dimension; + } + + var i, + j, + k, + added = [], + removed = []; + + // Fast incremental update based on previous lo index. + if (lo1 < lo0) { + for (i = lo1, j = Math.min(lo0, hi1); i < j; ++i) { + filters[k = index[i]] ^= one; + added.push(k); + } + } else if (lo1 > lo0) { + for (i = lo0, j = Math.min(lo1, hi0); i < j; ++i) { + filters[k = index[i]] ^= one; + removed.push(k); + } + } + + // Fast incremental update based on previous hi index. + if (hi1 > hi0) { + for (i = Math.max(lo1, hi0), j = hi1; i < j; ++i) { + filters[k = index[i]] ^= one; + added.push(k); + } + } else if (hi1 < hi0) { + for (i = Math.max(lo0, hi1), j = hi0; i < j; ++i) { + filters[k = index[i]] ^= one; + removed.push(k); + } + } + + lo0 = lo1; + hi0 = hi1; + filterListeners.forEach(function(l) { l(one, added, removed); }); + return dimension; + } + + // Filters this dimension using the specified range, value, or null. + // If the range is null, this is equivalent to filterAll. + // If the range is an array, this is equivalent to filterRange. + // Otherwise, this is equivalent to filterExact. + function filter(range) { + return range == null + ? filterAll() : Array.isArray(range) + ? filterRange(range) : typeof range === "function" + ? filterFunction(range) + : filterExact(range); + } + + // Filters this dimension to select the exact value. + function filterExact(value) { + return filterIndexBounds((refilter = crossfilter_filterExact(bisect, value))(values)); + } + + // Filters this dimension to select the specified range [lo, hi]. + // The lower bound is inclusive, and the upper bound is exclusive. + function filterRange(range) { + return filterIndexBounds((refilter = crossfilter_filterRange(bisect, range))(values)); + } + + // Clears any filters on this dimension. + function filterAll() { + return filterIndexBounds((refilter = crossfilter_filterAll)(values)); + } + + // Filters this dimension using an arbitrary function. + function filterFunction(f) { + refilter = crossfilter_filterAll; + + filterIndexFunction(refilterFunction = f); + + lo0 = 0; + hi0 = n; + + return dimension; + } + + function filterIndexFunction(f) { + var i, + k, + x, + added = [], + removed = []; + + for (i = 0; i < n; ++i) { + if (filters[k = index[i]] & one ^ !(x = f(values[i], i))) { + if (x) filters[k] &= zero, added.push(k); + else filters[k] |= one, removed.push(k); + } + } + filterListeners.forEach(function(l) { l(one, added, removed); }); + } + + // Returns the top K selected records based on this dimension's order. + // Note: observes this dimension's filter, unlike group and groupAll. + function top(k) { + var array = [], + i = hi0, + j; + + while (--i >= lo0 && k > 0) { + if (!filters[j = index[i]]) { + array.push(data[j]); + --k; + } + } + + return array; + } + + // Returns the bottom K selected records based on this dimension's order. + // Note: observes this dimension's filter, unlike group and groupAll. + function bottom(k) { + var array = [], + i = lo0, + j; + + while (i < hi0 && k > 0) { + if (!filters[j = index[i]]) { + array.push(data[j]); + --k; + } + i++; + } + + return array; + } + + // Adds a new group to this dimension, using the specified key function. + function group(key) { + var group = { + top: top, + all: all, + reduce: reduce, + reduceCount: reduceCount, + reduceSum: reduceSum, + order: order, + orderNatural: orderNatural, + size: size, + dispose: dispose, + remove: dispose // for backwards-compatibility + }; + + // Ensure that this group will be removed when the dimension is removed. + dimensionGroups.push(group); + + var groups, // array of {key, value} + groupIndex, // object id ↦ group id + groupWidth = 8, + groupCapacity = crossfilter_capacity(groupWidth), + k = 0, // cardinality + select, + heap, + reduceAdd, + reduceRemove, + reduceInitial, + update = crossfilter_null, + reset = crossfilter_null, + resetNeeded = true, + groupAll = key === crossfilter_null; + + if (arguments.length < 1) key = crossfilter_identity; + + // The group listens to the crossfilter for when any dimension changes, so + // that it can update the associated reduce values. It must also listen to + // the parent dimension for when data is added, and compute new keys. + filterListeners.push(update); + indexListeners.push(add); + removeDataListeners.push(removeData); + + // Incorporate any existing data into the grouping. + add(values, index, 0, n); + + // Incorporates the specified new values into this group. + // This function is responsible for updating groups and groupIndex. + function add(newValues, newIndex, n0, n1) { + var oldGroups = groups, + reIndex = crossfilter_index(k, groupCapacity), + add = reduceAdd, + initial = reduceInitial, + k0 = k, // old cardinality + i0 = 0, // index of old group + i1 = 0, // index of new record + j, // object id + g0, // old group + x0, // old key + x1, // new key + g, // group to add + x; // key of group to add + + // If a reset is needed, we don't need to update the reduce values. + if (resetNeeded) add = initial = crossfilter_null; + + // Reset the new groups (k is a lower bound). + // Also, make sure that groupIndex exists and is long enough. + groups = new Array(k), k = 0; + groupIndex = k0 > 1 ? crossfilter_arrayLengthen(groupIndex, n) : crossfilter_index(n, groupCapacity); + + // Get the first old key (x0 of g0), if it exists. + if (k0) x0 = (g0 = oldGroups[0]).key; + + // Find the first new key (x1), skipping NaN keys. + while (i1 < n1 && !((x1 = key(newValues[i1])) >= x1)) ++i1; + + // While new keys remain… + while (i1 < n1) { + + // Determine the lesser of the two current keys; new and old. + // If there are no old keys remaining, then always add the new key. + if (g0 && x0 <= x1) { + g = g0, x = x0; + + // Record the new index of the old group. + reIndex[i0] = k; + + // Retrieve the next old key. + if (g0 = oldGroups[++i0]) x0 = g0.key; + } else { + g = {key: x1, value: initial()}, x = x1; + } + + // Add the lesser group. + groups[k] = g; + + // Add any selected records belonging to the added group, while + // advancing the new key and populating the associated group index. + while (!(x1 > x)) { + groupIndex[j = newIndex[i1] + n0] = k; + if (!(filters[j] & zero)) g.value = add(g.value, data[j]); + if (++i1 >= n1) break; + x1 = key(newValues[i1]); + } + + groupIncrement(); + } + + // Add any remaining old groups that were greater than all new keys. + // No incremental reduce is needed; these groups have no new records. + // Also record the new index of the old group. + while (i0 < k0) { + groups[reIndex[i0] = k] = oldGroups[i0++]; + groupIncrement(); + } + + // If we added any new groups before any old groups, + // update the group index of all the old records. + if (k > i0) for (i0 = 0; i0 < n0; ++i0) { + groupIndex[i0] = reIndex[groupIndex[i0]]; + } + + // Modify the update and reset behavior based on the cardinality. + // If the cardinality is less than or equal to one, then the groupIndex + // is not needed. If the cardinality is zero, then there are no records + // and therefore no groups to update or reset. Note that we also must + // change the registered listener to point to the new method. + j = filterListeners.indexOf(update); + if (k > 1) { + update = updateMany; + reset = resetMany; + } else { + if (!k && groupAll) { + k = 1; + groups = [{key: null, value: initial()}]; + } + if (k === 1) { + update = updateOne; + reset = resetOne; + } else { + update = crossfilter_null; + reset = crossfilter_null; + } + groupIndex = null; + } + filterListeners[j] = update; + + // Count the number of added groups, + // and widen the group index as needed. + function groupIncrement() { + if (++k === groupCapacity) { + reIndex = crossfilter_arrayWiden(reIndex, groupWidth <<= 1); + groupIndex = crossfilter_arrayWiden(groupIndex, groupWidth); + groupCapacity = crossfilter_capacity(groupWidth); + } + } + } + + function removeData() { + if (k > 1) { + var oldK = k, + oldGroups = groups, + seenGroups = crossfilter_index(oldK, oldK); + + // Filter out non-matches by copying matching group index entries to + // the beginning of the array. + for (var i = 0, j = 0; i < n; ++i) { + if (filters[i]) { + seenGroups[groupIndex[j] = groupIndex[i]] = 1; + ++j; + } + } + + // Reassemble groups including only those groups that were referred + // to by matching group index entries. Note the new group index in + // seenGroups. + groups = [], k = 0; + for (i = 0; i < oldK; ++i) { + if (seenGroups[i]) { + seenGroups[i] = k++; + groups.push(oldGroups[i]); + } + } + + if (k > 1) { + // Reindex the group index using seenGroups to find the new index. + for (var i = 0; i < j; ++i) groupIndex[i] = seenGroups[groupIndex[i]]; + } else { + groupIndex = null; + } + filterListeners[filterListeners.indexOf(update)] = k > 1 + ? (reset = resetMany, update = updateMany) + : k === 1 ? (reset = resetOne, update = updateOne) + : reset = update = crossfilter_null; + } else if (k === 1) { + if (groupAll) return; + for (var i = 0; i < n; ++i) if (filters[i]) return; + groups = [], k = 0; + filterListeners[filterListeners.indexOf(update)] = + update = reset = crossfilter_null; + } + } + + // Reduces the specified selected or deselected records. + // This function is only used when the cardinality is greater than 1. + function updateMany(filterOne, added, removed) { + if (filterOne === one || resetNeeded) return; + + var i, + k, + n, + g; + + // Add the added values. + for (i = 0, n = added.length; i < n; ++i) { + if (!(filters[k = added[i]] & zero)) { + g = groups[groupIndex[k]]; + g.value = reduceAdd(g.value, data[k]); + } + } + + // Remove the removed values. + for (i = 0, n = removed.length; i < n; ++i) { + if ((filters[k = removed[i]] & zero) === filterOne) { + g = groups[groupIndex[k]]; + g.value = reduceRemove(g.value, data[k]); + } + } + } + + // Reduces the specified selected or deselected records. + // This function is only used when the cardinality is 1. + function updateOne(filterOne, added, removed) { + if (filterOne === one || resetNeeded) return; + + var i, + k, + n, + g = groups[0]; + + // Add the added values. + for (i = 0, n = added.length; i < n; ++i) { + if (!(filters[k = added[i]] & zero)) { + g.value = reduceAdd(g.value, data[k]); + } + } + + // Remove the removed values. + for (i = 0, n = removed.length; i < n; ++i) { + if ((filters[k = removed[i]] & zero) === filterOne) { + g.value = reduceRemove(g.value, data[k]); + } + } + } + + // Recomputes the group reduce values from scratch. + // This function is only used when the cardinality is greater than 1. + function resetMany() { + var i, + g; + + // Reset all group values. + for (i = 0; i < k; ++i) { + groups[i].value = reduceInitial(); + } + + // Add any selected records. + for (i = 0; i < n; ++i) { + if (!(filters[i] & zero)) { + g = groups[groupIndex[i]]; + g.value = reduceAdd(g.value, data[i]); + } + } + } + + // Recomputes the group reduce values from scratch. + // This function is only used when the cardinality is 1. + function resetOne() { + var i, + g = groups[0]; + + // Reset the singleton group values. + g.value = reduceInitial(); + + // Add any selected records. + for (i = 0; i < n; ++i) { + if (!(filters[i] & zero)) { + g.value = reduceAdd(g.value, data[i]); + } + } + } + + // Returns the array of group values, in the dimension's natural order. + function all() { + if (resetNeeded) reset(), resetNeeded = false; + return groups; + } + + // Returns a new array containing the top K group values, in reduce order. + function top(k) { + var top = select(all(), 0, groups.length, k); + return heap.sort(top, 0, top.length); + } + + // Sets the reduce behavior for this group to use the specified functions. + // This method lazily recomputes the reduce values, waiting until needed. + function reduce(add, remove, initial) { + reduceAdd = add; + reduceRemove = remove; + reduceInitial = initial; + resetNeeded = true; + return group; + } + + // A convenience method for reducing by count. + function reduceCount() { + return reduce(crossfilter_reduceIncrement, crossfilter_reduceDecrement, crossfilter_zero); + } + + // A convenience method for reducing by sum(value). + function reduceSum(value) { + return reduce(crossfilter_reduceAdd(value), crossfilter_reduceSubtract(value), crossfilter_zero); + } + + // Sets the reduce order, using the specified accessor. + function order(value) { + select = heapselect_by(valueOf); + heap = heap_by(valueOf); + function valueOf(d) { return value(d.value); } + return group; + } + + // A convenience method for natural ordering by reduce value. + function orderNatural() { + return order(crossfilter_identity); + } + + // Returns the cardinality of this group, irrespective of any filters. + function size() { + return k; + } + + // Removes this group and associated event listeners. + function dispose() { + var i = filterListeners.indexOf(update); + if (i >= 0) filterListeners.splice(i, 1); + i = indexListeners.indexOf(add); + if (i >= 0) indexListeners.splice(i, 1); + i = removeDataListeners.indexOf(removeData); + if (i >= 0) removeDataListeners.splice(i, 1); + return group; + } + + return reduceCount().orderNatural(); + } + + // A convenience function for generating a singleton group. + function groupAll() { + var g = group(crossfilter_null), all = g.all; + delete g.all; + delete g.top; + delete g.order; + delete g.orderNatural; + delete g.size; + g.value = function() { return all()[0].value; }; + return g; + } + + // Removes this dimension and associated groups and event listeners. + function dispose() { + dimensionGroups.forEach(function(group) { group.dispose(); }); + var i = dataListeners.indexOf(preAdd); + if (i >= 0) dataListeners.splice(i, 1); + i = dataListeners.indexOf(postAdd); + if (i >= 0) dataListeners.splice(i, 1); + i = removeDataListeners.indexOf(removeData); + if (i >= 0) removeDataListeners.splice(i, 1); + for (i = 0; i < n; ++i) filters[i] &= zero; + m &= zero; + return dimension; + } + + return dimension; + } + + // A convenience method for groupAll on a dummy dimension. + // This implementation can be optimized since it always has cardinality 1. + function groupAll() { + var group = { + reduce: reduce, + reduceCount: reduceCount, + reduceSum: reduceSum, + value: value, + dispose: dispose, + remove: dispose // for backwards-compatibility + }; + + var reduceValue, + reduceAdd, + reduceRemove, + reduceInitial, + resetNeeded = true; + + // The group listens to the crossfilter for when any dimension changes, so + // that it can update the reduce value. It must also listen to the parent + // dimension for when data is added. + filterListeners.push(update); + dataListeners.push(add); + + // For consistency; actually a no-op since resetNeeded is true. + add(data, 0, n); + + // Incorporates the specified new values into this group. + function add(newData, n0) { + var i; + + if (resetNeeded) return; + + // Add the added values. + for (i = n0; i < n; ++i) { + if (!filters[i]) { + reduceValue = reduceAdd(reduceValue, data[i]); + } + } + } + + // Reduces the specified selected or deselected records. + function update(filterOne, added, removed) { + var i, + k, + n; + + if (resetNeeded) return; + + // Add the added values. + for (i = 0, n = added.length; i < n; ++i) { + if (!filters[k = added[i]]) { + reduceValue = reduceAdd(reduceValue, data[k]); + } + } + + // Remove the removed values. + for (i = 0, n = removed.length; i < n; ++i) { + if (filters[k = removed[i]] === filterOne) { + reduceValue = reduceRemove(reduceValue, data[k]); + } + } + } + + // Recomputes the group reduce value from scratch. + function reset() { + var i; + + reduceValue = reduceInitial(); + + for (i = 0; i < n; ++i) { + if (!filters[i]) { + reduceValue = reduceAdd(reduceValue, data[i]); + } + } + } + + // Sets the reduce behavior for this group to use the specified functions. + // This method lazily recomputes the reduce value, waiting until needed. + function reduce(add, remove, initial) { + reduceAdd = add; + reduceRemove = remove; + reduceInitial = initial; + resetNeeded = true; + return group; + } + + // A convenience method for reducing by count. + function reduceCount() { + return reduce(crossfilter_reduceIncrement, crossfilter_reduceDecrement, crossfilter_zero); + } + + // A convenience method for reducing by sum(value). + function reduceSum(value) { + return reduce(crossfilter_reduceAdd(value), crossfilter_reduceSubtract(value), crossfilter_zero); + } + + // Returns the computed reduce value. + function value() { + if (resetNeeded) reset(), resetNeeded = false; + return reduceValue; + } + + // Removes this group and associated event listeners. + function dispose() { + var i = filterListeners.indexOf(update); + if (i >= 0) filterListeners.splice(i); + i = dataListeners.indexOf(add); + if (i >= 0) dataListeners.splice(i); + return group; + } + + return reduceCount(); + } + + // Returns the number of records in this crossfilter, irrespective of any filters. + function size() { + return n; + } + + return arguments.length + ? add(arguments[0]) + : crossfilter; +} + +// Returns an array of size n, big enough to store ids up to m. +function crossfilter_index(n, m) { + return (m < 0x101 + ? crossfilter_array8 : m < 0x10001 + ? crossfilter_array16 + : crossfilter_array32)(n); +} + +// Constructs a new array of size n, with sequential values from 0 to n - 1. +function crossfilter_range(n) { + var range = crossfilter_index(n, n); + for (var i = -1; ++i < n;) range[i] = i; + return range; +} + +function crossfilter_capacity(w) { + return w === 8 + ? 0x100 : w === 16 + ? 0x10000 + : 0x100000000; +} +})(typeof exports !== 'undefined' && exports || this); diff --git a/scratch/pipes/README.md b/scratch/pipes/README.md new file mode 100644 index 0000000..c2bf1ed --- /dev/null +++ b/scratch/pipes/README.md @@ -0,0 +1,36 @@ +Experimenting with constructing pipelines with subprocess.py + +The one big win that shell has over python is that doing "./spool | filter | reduce" is really easy and really efficient: as efficient as the individual programs +But the subprocess module is finicky about doing this sort of thing. + +[the docs](https://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline) claim you can build pipelines like this: +``` +output=`dmesg | grep hda` +# becomes +p1 = Popen(["dmesg"], stdout=PIPE) +p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE) +p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. +output = p2.communicate()[0] +``` + +But that is misleading: `communicate()` buffers the input between the two processes. + +`spool.py` + `s2.py` demonstrates that to get things running in real time, the thing is for the producer +(`spool.py` or `dmesg` or whatever is on the left side of the pipe) to call `flush()` to get things moving. +Yet, running `spool.py` on a terminal gets output immediately. +Why is this? The pipe constructed by python by default has a buffer size of io.DEFAULT_BUFFER_SIZE(==8192). + Frustratingly, setting the 'bufsize' argument to 0 does *not* automatically give unbuffered .stdout, but it does set .stdout to be what would otherwise be .stdout.raw + There was [a patch](http://bugs.python.org/issue11459) to 3.1 and 3.2 that should have fixed it + But you need also the writer to not be doing its own buffering (just because your side of a pipe is unbuffered doesn't mean the other side is) +e.g. [see](http://chase-seibert.github.io/blog/2012/11/16/python-subprocess-asynchronous-read-stdout.html): """you need to make sure that the subprocess you are invoking is not doing its own buffering. It took me a bit to figure out that mysql does do that, which is what the --unbuffered flag is there to disable.""" and [this](http://stackoverflow.com/questions/107705/python-output-buffering) + + +* [Related technical links](http://bugs.python.org/issue19929) (hidden in a bug report) +* http://objectmix.com/python/383415-working-around-buffering-issues-when-writing-pipes.html + +By experiment, the buffer size on my computer seems to be somewhere around 10000000>>4 bytes. +Link above suggests it should be 65536 bytes. + +[some systems](http://www.gnu.org/software/libc/manual/html_node/Controlling-Buffering.html) has a way to control buffering on an already open file descriptor. + + diff --git a/scratch/pipes/s2.py b/scratch/pipes/s2.py new file mode 100644 index 0000000..622414e --- /dev/null +++ b/scratch/pipes/s2.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 + +import subprocess +import time + +import select + +with open("spool.err","wb",buffering=0) as log: + p1 = subprocess.Popen(["python", "-u", "./spool.py"], bufsize=0, stdout=subprocess.PIPE, stderr=log) + import IPython; IPython.embed() diff --git a/scratch/pipes/s3.py b/scratch/pipes/s3.py new file mode 100644 index 0000000..040c101 --- /dev/null +++ b/scratch/pipes/s3.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 + +import os +import sys +from subprocess import Popen, PIPE + +def produce(to_sed): + for i in ('beet', 'meet', 'sneet'): + to_sed.write(i + '\n') + to_sed.flush() + +def consume(from_sed): + print "CONSUME"; sys.stdout.flush() + while 1: + print "TRYING TO READ from_sed"; sys.stdout.flush() + res = from_sed.readline() + print "READ from_sed"; sys.stdout.flush() + if not res: + sys.exit(0) + #sys.exit(proc.poll()) + print 'received: ', [res] + + +def main(): + #proc = ['sed', 's/ee/oo/g' ] + proc = ["./spool.py"] + log = open("spool.err","wb",buffering=0) + proc = Popen(proc,stdin=PIPE,stdout=PIPE,stderr=log) + to_sed = proc.stdin + from_sed = proc.stdout + + pid = os.fork() + if pid == 0: + from_sed.close() + produce(to_sed) + return + else: + to_sed.close() + consume(from_sed) + +if __name__ == '__main__': + main() diff --git a/scratch/pipes/s4.py b/scratch/pipes/s4.py new file mode 100644 index 0000000..ef36a22 --- /dev/null +++ b/scratch/pipes/s4.py @@ -0,0 +1,100 @@ +#! /bin/env python + +import os +import sys +import signal +from subprocess import Popen, PIPE +import multiprocessing +from Queue import Empty + +def produce(to_processor,pending): + while 1: + try: + item = pending.get(False) + except Empty: + item = '' + if item is produce: + #that's the signal to stop! + to_processor.close() + return + #must keep filling the buffer with something ('\n') + to_processor.write(item + '\n') + to_processor.flush() + +def consume(from_processor,done): + while 1: + res = from_processor.readline() + print("READ THIS:", res) + if not res: + from_processor.close() + done.put(consume) + return + done.put(res) + +def controller(pending, done): + for i in ('beet', 'meet', 'sneet'): + pending.put(i) + needed = 3 + quantity_done = 0 + while 1: + item = done.get() + if item and item != '\n': + if item.startswith('b'): + pending.put('Z' + item) + needed += 1 + quantity_done +=1 + print item, + if quantity_done == needed: + pending.put(produce) + break + while 1: + item = done.get() + if item is consume: + return + +def main(): + r''' + workflow: + producer -> processor -> consumer + ^ / + \ v + pending done + ^ / + \ v + controller + components: + + controller: the original script + producer: a forked clone of controller + consumer: a forked clone of controller + processor: a subprocess.popen instance + pending: a multiprocessing queue + done: a multiprocessing queue + ''' + #proc = ['sed', 's/ee/oo/g' ] + proc = ["./spool.py"] + proc = Popen(proc,stdin=PIPE,stdout=PIPE) + to_processor, from_processor = proc.stdin, proc.stdout + + pending = multiprocessing.Queue() + pid = os.fork() + if pid == 0: + from_processor.close() + produce(to_processor,pending) + return + done = multiprocessing.Queue() + pid2 = os.fork() + if pid2 == 0: + to_processor.close() + consume(from_processor,done) + return + to_processor.close() + from_processor.close() + res = controller(pending, done) + os.waitpid(pid,0) + os.waitpid(pid2,0) + return res + + +if __name__ == '__main__': + main() diff --git a/scratch/pipes/spool.err b/scratch/pipes/spool.err new file mode 100644 index 0000000..516385d --- /dev/null +++ b/scratch/pipes/spool.err @@ -0,0 +1,10 @@ +spooled #0 +spooled #1 +spooled #2 +spooled #3 +spooled #4 +spooled #5 +Traceback (most recent call last): + File "./spool.py", line 13, in + print("[%d] %d" % (i, f)) +BrokenPipeError: [Errno 32] Broken pipe diff --git a/scratch/pipes/spool.py b/scratch/pipes/spool.py new file mode 100755 index 0000000..f648356 --- /dev/null +++ b/scratch/pipes/spool.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 + +import time, sys + +def fib(): + a,b =1,1 + while True: + yield a + a,b = b, a+b + +if __name__ == '__main__': + for i,f in enumerate(fib()): + print("[%d] %d" % (i, f)) + + #sys.stdout.flush() + print("spooled #%d" % i, file=sys.stderr, flush=True) + time.sleep(2) + diff --git a/scratch/pipes/subprocess.pipe.py b/scratch/pipes/subprocess.pipe.py new file mode 100644 index 0000000..0303867 --- /dev/null +++ b/scratch/pipes/subprocess.pipe.py @@ -0,0 +1,19 @@ +# from http://stackoverflow.com/questions/1595492/blocks-send-input-to-python-subprocess-pipeline +from subprocess import Popen, PIPE, STDOUT +import sys, time + +#p1 = Popen(["grep", "-v", "notf"], stdin=PIPE, stdout=PIPE, close_fds=True) +p1 = Popen(["./spool.py"], stdout=PIPE, close_fds=True) +#print (p1.stdin, sys.stdin) +#p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True) +#p1.stdin.write(b'Hello World\n') +#p1.stdin.write(b"not this one\n"); +#p1.stdin.flush() +#sys.stderr.write("now look in your process list for grep and cut\n") +#time.sleep(2) +#p1.stdin.close() #If I do not close, what happens? +sys.stderr.write("attempting to read from the end of the pipeline\n") +result = p1.stdout.raw.read(5) +sys.stderr.write("we read:n") +sys.stderr.write(result+"\n"); +assert result == "Hello Worl\n" diff --git a/scratch/psql/README.md b/scratch/psql/README.md deleted file mode 100644 index 81e2a88..0000000 --- a/scratch/psql/README.md +++ /dev/null @@ -1,68 +0,0 @@ - -A subproject to provide a framework for linking model data to the web in a way that supports sophisticated queries so you can get specific pieces of data, and that keeps data up to date without requiring the web-page to re-download the whole page each time. - -Postgres 9.4 has a feature called "Logical Replication", which does most of what we need, but it is not ready for prime time yet. - -We are currently thinking of using this with D3 for plots and maps. - -Alternative approaches to using data on the web include querying specific streams, CouchDB, and dat (dat-data.com). (What others?) - -Quickstart ---------- - -To start, make sure you have installed python, postgresql with the pl/python extension, and - -**warning: these instructions are not copypasteable. you need to think and understand before you use them** - -1) set up the database -``` -$ cd modex/scratch/psql -$ initdb data/ #initialize postgres('s data) -$ ./server.sh #start postgres -$ ./client.sh < replicate.pysql #load the replicate.py hooks into postgres -$ ./client.sh #open up postgres and set up some tables, e.g. the 'films' table from test.sql -``` - -2) run the websocket replication server -``` -$ cd modex/scratch/psql -$ ./replicate_server.sh 8081 films #in this case, ws://localhost:8081 will replicate table films -``` -The 8081 is important here, since that port number is hardcoded in the javscript. - -3) run the frontend -``` -$ cd modex/ -$ python -m SimpleHTTPServer #or http.server for python3 -$ firefox http://localhost:8000/src/frontend/pourgraph.html -``` -(and open up the js console to watch the action) - -4) apply some updates (e.g. the second batch of lines about films from test.sql) -Any `INSERT`, `UPDATE` or `DELETE` done on the command line should immediately show up in your js console. - - -To reset -``` -$ rm -r data/ -``` - and start at the top. - -Issues ------- - -* Security: exposing the raw SQL protocol to the web has lots of implicit problems. - Better idea: flesh out replicant.py until it can speak to postgres, have it reformat the WAL logs into JSON and ship those, read-only. We can even drop Websockify (though it might simply be easier and more reliable to chain a pipe + nc + websockify together) - -Files ------ - -* server.sh / client.sh : short bash scripts which launch a fresh Postgres instance in the local directory -* websocket.sh : run the websockify proxy, with automatic SSL cert generation. -* replicant.py : prototype implementation of the replication protocol. This is the main file and it reimplements what we need of http://www.postgresql.org/docs/current/static/protocol-replication.html in Python. -* replicant.js : postgres protocol in Javascript, from what was learned. This does not exist yet and would be a reimplementation of replication.py. It may or may not end up being needed. -* ????.js: shim which does datagram-to-stream reconstruction (since WebSockets, despite running over TCP, do not have a stream mode, which postgres (and many other) protocols assume) - -Links ------ - diff --git a/scratch/psql/FUD.txt b/scratch/psql/attic/FUD.txt similarity index 100% rename from scratch/psql/FUD.txt rename to scratch/psql/attic/FUD.txt diff --git a/scratch/psql/attic/README.md b/scratch/psql/attic/README.md new file mode 100644 index 0000000..4903430 --- /dev/null +++ b/scratch/psql/attic/README.md @@ -0,0 +1 @@ +This is the cruft code generated in writing postgres realtime replication, which now lives in src/backend/db diff --git a/scratch/psql/WAL/README.md b/scratch/psql/attic/WAL/README.md similarity index 100% rename from scratch/psql/WAL/README.md rename to scratch/psql/attic/WAL/README.md diff --git a/scratch/psql/WAL/messages.py b/scratch/psql/attic/WAL/messages.py similarity index 100% rename from scratch/psql/WAL/messages.py rename to scratch/psql/attic/WAL/messages.py diff --git a/scratch/psql/WAL/replicant.py b/scratch/psql/attic/WAL/replicant.py similarity index 100% rename from scratch/psql/WAL/replicant.py rename to scratch/psql/attic/WAL/replicant.py diff --git a/scratch/psql/WAL/replication_messages.py b/scratch/psql/attic/WAL/replication_messages.py similarity index 100% rename from scratch/psql/WAL/replication_messages.py rename to scratch/psql/attic/WAL/replication_messages.py diff --git a/scratch/psql/util.py b/scratch/psql/attic/WAL/util.py similarity index 100% rename from scratch/psql/util.py rename to scratch/psql/attic/WAL/util.py diff --git a/scratch/psql/attic/experiments/forkit.py b/scratch/psql/attic/experiments/forkit.py new file mode 100644 index 0000000..f5e7cf4 --- /dev/null +++ b/scratch/psql/attic/experiments/forkit.py @@ -0,0 +1,71 @@ + +# socat with the "fork" option holds open programs even after they've died +# maybe I need to check is sys.stdout exists???? + +import sys +import uuid + +import threading, time +import atexit + +I = uuid.uuid4() +i = 0 + +def alive(): + global i + while True: + #a = input() + #print(a*2) + sys.stderr.write(str(I)+":"+str(i)) + sys.stderr.write("\n") + sys.stderr.flush() + time.sleep(2) + i += 1 + +T = threading.Thread(target=alive); +T.start() + +def q(): + print("quittign") + sys.stderr.write("stderr::quitting\n"); sys.stderr.flush() +atexit.register(q) + +# used with socat's EXEC address, e.g. +# socat UNIX-LISTEN:/tmp/sw,fork,reuseaddr EXEC:"python forkit.py" +# socat TCP-LISTEN:7777,fork,reuseaddr EXEC:"python forkit.py" +# and then connected to +# this is indeed shutdown properly by socat when the client disconnects. +# However, for some reason replicate.py doesn't shutdown. +# +# I know what's the problem: +# this is being hard-killed (-9'd, or something), and so the process simply dies without having a change to clean up after itself. +# The way I've written the code, it only calls Changes.__exit__() (--> so therefore calls unregister()) +# if an exception happens *within* replicate--and not just that, +# but within the thread doing the spooling. +# +# ...so how do I fix this? +# I could trap the exit signal maybe, but that seems like a patchwork solution and won't run on the right thread +# I could have a monitor thread watching (via select(), even) sys.stdin or sys.stdout and looking for EOF --- I've got this written and it reliably catches the quit--and then raise a signal to trigger the main loop to quit +# ^ this is flakey, because there's no guarantee this thread will run between the socket closing and the kill coming +# I could restructure the code so that the spooling loop select()s on *both* the input (unix domain socket from inside of postgres) and output (stdout) streams. Then, when +# WORKING BUT AWKWARD SOLUTION: set end-close on the EXEC in socat; this makes socat forgo the; then, do the restructuring to watch for stdin falling over. +# CURRENTLY AWKWARD because I've written +# for delta in replicate(table): +# if select.select([sys.stdin], [], [],0)[0]: +# so replicants don't find out they should die until the next time a delta comes from the database. This isn't the end of the world, but it certainly provides a way (for an attacker?) to chew up resources if we also allow clients to indirectly control writes to the DB: open and close the client page a million times; this will spawn a million replicates which won't ever close +# I think the only other ways to detect the client dying are +# - rewrite as a socket app and check explicitly +# - use signal +# the monitoring thread is essentially no better than restructuring the code; the code will *still* have to be +# # Oh! If it is important that we quit, then make the main thread the one monitoring stdin +# and put +# hm but really +# ..oh, except.. well... we're using datagrams on the input side so we don't have... + +# This is additionally made extra complicated by replicate.sh, since processes do not take their children with them by default +# --> it would be better if replicate.sh could be avoided -- all it does is set an environment var, and then only on OS X + +while True: + time.sleep(1) + + diff --git a/scratch/psql/experiments/infiniteresult.py b/scratch/psql/attic/experiments/infiniteresult.py similarity index 100% rename from scratch/psql/experiments/infiniteresult.py rename to scratch/psql/attic/experiments/infiniteresult.py diff --git a/scratch/psql/experiments/instead_of.pysql b/scratch/psql/attic/experiments/instead_of.pysql similarity index 100% rename from scratch/psql/experiments/instead_of.pysql rename to scratch/psql/attic/experiments/instead_of.pysql diff --git a/scratch/psql/experiments/ni.py b/scratch/psql/attic/experiments/ni.py similarity index 100% rename from scratch/psql/experiments/ni.py rename to scratch/psql/attic/experiments/ni.py diff --git a/scratch/psql/experiments/nonblockingpipe.py b/scratch/psql/attic/experiments/nonblockingpipe.py similarity index 100% rename from scratch/psql/experiments/nonblockingpipe.py rename to scratch/psql/attic/experiments/nonblockingpipe.py diff --git a/scratch/psql/sel.py b/scratch/psql/attic/experiments/sel.py similarity index 100% rename from scratch/psql/sel.py rename to scratch/psql/attic/experiments/sel.py diff --git a/scratch/psql/experiments/sqlalchemysh b/scratch/psql/attic/experiments/sqlalchemysh similarity index 100% rename from scratch/psql/experiments/sqlalchemysh rename to scratch/psql/attic/experiments/sqlalchemysh diff --git a/scratch/psql/experiments/subtransaction.pysql b/scratch/psql/attic/experiments/subtransaction.pysql similarity index 100% rename from scratch/psql/experiments/subtransaction.pysql rename to scratch/psql/attic/experiments/subtransaction.pysql diff --git a/scratch/psql/experiments/t.pysql b/scratch/psql/attic/experiments/t.pysql similarity index 100% rename from scratch/psql/experiments/t.pysql rename to scratch/psql/attic/experiments/t.pysql diff --git a/scratch/psql/experiments/unixsocket.py b/scratch/psql/attic/experiments/unixsocket.py similarity index 100% rename from scratch/psql/experiments/unixsocket.py rename to scratch/psql/attic/experiments/unixsocket.py diff --git a/scratch/psql/experiments/velvet_client.py b/scratch/psql/attic/experiments/velvet_client.py similarity index 100% rename from scratch/psql/experiments/velvet_client.py rename to scratch/psql/attic/experiments/velvet_client.py diff --git a/scratch/psql/experiments/velvet_server.py b/scratch/psql/attic/experiments/velvet_server.py similarity index 100% rename from scratch/psql/experiments/velvet_server.py rename to scratch/psql/attic/experiments/velvet_server.py diff --git a/scratch/psql/websockify.md b/scratch/psql/attic/experiments/websockify.md similarity index 100% rename from scratch/psql/websockify.md rename to scratch/psql/attic/experiments/websockify.md diff --git a/scratch/psql/replicate.v1.py b/scratch/psql/attic/replicate.v1.py similarity index 100% rename from scratch/psql/replicate.v1.py rename to scratch/psql/attic/replicate.v1.py diff --git a/scratch/psql/watch.psql b/scratch/psql/attic/watch.psql similarity index 100% rename from scratch/psql/watch.psql rename to scratch/psql/attic/watch.psql diff --git a/scratch/psql/client.sh b/scratch/psql/client.sh deleted file mode 100755 index 15d30ce..0000000 --- a/scratch/psql/client.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - - -pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null -cd $HERE - -psql -d postgres -h 127.0.0.1 -# -h "$HERE" - # initdb makes a default database called "postgres" so we just ride on that. - # -h makes postgres use the current directory look for its socket in $HERE - # ...except for some reason "postgres -k ." doesn't seem to actually write the socket file into $HERE, - # but it stops postgres from trying to write to /var/run/postgresql - #so I fall back on TCP, as usual. Which is fine, because that makes sniffing the traffic easier. diff --git a/scratch/psql/replicate.py b/scratch/psql/replicate.py deleted file mode 100755 index 3d87db5..0000000 --- a/scratch/psql/replicate.py +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python -# usage: replicate table --> stdout posts a line-oriented stream of HRDJ -# third-time's-a-charm edition -# depends: sqlalchemy - -# depends: watch.pysql and replicate.pysql are loaded into postgres - -""" -TODO: - irritant: select() blocks ctrl-c, because it's an I/O wait (quick fix: give a long timeout on select and spin it in a polling loop) - -clean up the sql injections - -clean up naming ("my_pg"? really?) - -support views (filtered replication) ---- doing this right requires some finesse. I have the logic written down in replicant.py, but distinguishing each client from each other will be tricky. - -consider whether we can ditch SQLAlchemy; all this script does is issue simple SQL commands. - speaking DBAPI directly would be less heavy - - -unregister is not getting called on ctrl-c or other exceptions, even though I've - --> since the with: is inside of a generator,there's paths through the code which quit without going through the .__exit__(). Very annoying. - ...but it should mostly be blocking in Changes.__next__, so why is this such an obvious problem?? - -Concurrency bugs: - by adding artificial stalls (time.sleep()) to the script, you can demonstrate these to yourself - 1) writes that occur after "cur = " but before "changes = " go to the great bit bucket in the sky - solutions: - a. figure out some way to ask postgres for timeline location id (xid)s and ask it to replay from those certain poitns - b. Use an explicit table lock around acquiring the two cursors (NB: the changes cursor is not a SQL cursor) - - http://www.postgresql.org/docs/current/static/sql-lock.html / http://www.postgresql.org/docs/current/static/explicit-locking.html - ...i think that's the only one. We could get the changes cursor first, and then be in the equally difficult situation of having double-writes (so, a delete would show up as a nonexistent row and then a delete which would get confused, an insert would show up as a duplicated row, an update would show up as both -- or maybe it would show up as unable to be applied since the old row would). If a human was doing this work, these sorts of errors would be manageable, but for a computer this is equally difficult. - but since we make sure to (let the kernel) buffer changes for us simultaneously to the current state being written out, we should at least not miss anything in the gaps of writing out the current state. - -""" - -import os - -# there's deployment issues on OS X: -# http://stackoverflow.com/questions/16407995/psycopg2-image-not-found -os.environ["DYLD_LIBRARY_PATH"] = "/Applications/Postgres.app/Contents/Versions/9.3/lib/" -#+ ":" + os.environ.setdefault("DYLD_LIBRARY_PATH","") -import psycopg2 - -import sqlalchemy - - -import os, tempfile, socket, select -import json -import os - - -import logging - -logging.getLogger().setLevel(logging.DEBUG) - -DB_SOCKET = "data" #path to folder containing the postgres socket -DB_SOCKET = os.path.abspath(DB_SOCKET) #postgres can't handle relative paths -DB_CONN_STRING = "postgresql:///postgres?host=%s" % (DB_SOCKET,) -E = sqlalchemy.create_engine(DB_CONN_STRING) #TODO: deglobalize - -#import IPython; IPython.embed() - -# We are allowed to have multiple ResultProxies open during a single connection. -# - - - # this is code that should be library code - # but installing it such that postgres can read it - # and without stomping on other things too badly is hard - # so for now it is just loaded here over and over again -class Changes: - MTU = 2048 #maximum bytes to read per message - - def __init__(self, table): #TODO: support where clauses - self._table = table - self._stream_id = None - - def __enter__(self): - # set up our listening socket - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self._sock.bind(tempfile.mktemp(prefix="pg_replicate_%s" % (self._table,))) #XXX RACE CONDITION; mktemp.__doc__ specifically says "this function is unsafe"; however, how else to make a socket? - # also using the table name in the file name is probably poor form - logging.info("listening for changes to %s on %s", self._table, self._sock.getsockname()) - - # register ourselves with the source - # XXX SQL injection here - # autocommit is explicitly turned on because SQLAlchemy assumes all selects are mutationless - # the docs expplicitly cover how to workaround this: http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html#understanding-autocommit - C = E.connect() - r = C.execution_options(autocommit=True).execute("select * from my_pg_replicate_register('%s', '%s')" % (self._table, self._sock.getsockname())) - r = r.scalar() - logging.debug("register() result is: %s", r) - C.close() - - self._stream_id = r - - return self #oops! - - def __exit__(self, *args): - # unregister ourselves - # XXX SQL injection here - C = E.connect() - r = C.execution_options(autocommit=True).execute("select * from my_pg_replicate_unregister('%s')" % (self._stream_id)) - r = r.scalar() - logging.debug("unregister() result is: %s", r) - C.close() - - # shutdown the socket - os.unlink(self._sock.getsockname()) # necessary because we're using unix domain sockets - self._sock.close() - - def __iter__(self): - return self - - def __next__(self): - fread, fwrite, ferr = select.select([self._sock], [], [self._sock]) #block until some data is available - if ferr: - pass #XXX - else: - pkt = self._sock.recv(Changes.MTU) - pkt = pkt.decode('utf-8') - return pkt - - next = __next__ #py2 compatibility - - -def replicate(_table): - - - # XXX we might need to construct the Changes stream first - # If we do have concurrency problems, doing that at least guarantees that we don't miss any changes, though we might end up with duplicate rows or trying to delete nonexistent rows - - # 2) get a handle on the change stream beginning at the commit postgres was at *now* (?? maybe this involves locking?) - # XXX we're relying on the time between the select and Changes.__enter__() to be small enough to be atomically - # but that's never absolutely true so this has a race condition! - # It would be better if we could ask postgres - # "what is the lamport clock of our previous select", which postgres internally records [CITE: ....] - # And then say "with Changes(_table,[ columns,][ where,] from=clock)" - # (NB: a lamport clock is a count of events; it has nothing to do with real time except that it always increases in time, making it suitable for synchronizing concurrent processes even when their system clocks might skew) - # but postgres doesn't seem(?) to provide a way to extract; it just uses timelines to make sure every session sees a consistent set of data. - # this is sort of tricky - # I need to say somethign like - - with Changes(_table) as changes: #<-- use with to get the benefits of RAII, since Changes has a listening endpoint to worry about cleaning up - - # README: BUGFIX: the change to raw_connection() caused a deadlock which only occurs the first time register() is called: register() needs to create a trigger on _table, but cur holds a lock on _table - # it seems, however, that reordering the instructions avoids the deadlock - # and i was already considering doing this; this order means we potentially have overlapping state in the Changes and cur feeds - # 1) get a cursor on the current query - #plan = plpy.prepare("select * from $1", ["text"]) # use a planner object to safeguard against SQL injection #<--- ugh, but postgres disagrees with this; I guess it doesn't want the table name to be dynamic.. - #print("the plan is", plan) - #cur = plpy.cursor(plan, [_table]); - # stream_results is turned on for this query so that this line takes as little time as possible - - C_DBAPI = E.raw_connection() - #cur = C.execution_options(stream_results=False).execute("select * from %s" % (_table,)) - - cur = C_DBAPI.cursor() - # XXX this needs to be wrapped in a try: ... finally: C_DBAPI.close() - cur.execute("select * from %s" % (_table,)) - - # 3) spool out the current state - # --------------------------------------------------- - #import IPython; IPython.embed() - keys = [col.name for col in cur.description] #low level SQLAlchemy (psycopg2, in this case) - #keys = cur.keys() #SQLAlchemy - for row in cur: - row = dict(zip(keys, row)) #coerce the SQLAlchemy row format to a dictionary - delta = {"+": row} #convert row to our made up delta format; the existing rows can all be considered inserts - delta = json.dumps(delta) #and then to JSON - yield delta - # do I need to explicitly close the cursor? - - cur.close() - C_DBAPI.close() - - # 4) spin, spooling out the change stream - # --------------------------------------------------- - for delta in changes: - # we assume that the source (watch_table()) has already jsonified things for us; THIS MIGHT BE A MISTAKE - yield delta - # NOTREACHED (unless something crashes, the changes feed should be infinite, and a crash would crash before this line anyway) - - -if __name__ == '__main__': - - import sys - table = sys.argv[1] - - for delta in replicate(table): - #print(delta, flush=True) #py3 - print delta; sys.stdout.flush() #py2 - - # NOTREACHED - diff --git a/scratch/psql/server.sh b/scratch/psql/server.sh deleted file mode 100755 index 0d7be8b..0000000 --- a/scratch/psql/server.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env bash - -# before running this, do -# initdb ./data/ -# - -if [ `uname` = "Darwin" ]; then - # OS X has several possible python distros - # our supported configuration uses Postgres.app, - # which is built linked against the Apple distro of python - # so we need to ensure the Apple python is the one loaded - # or else strange library errors will crop up - # - # Unfortunately, I'm having trouble building psycopg2 against the system python, so we clients (ie replicate.py) has to be run with Anaconda Python - - export PATH=/usr/bin:$PATH -fi - -pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null -cd $HERE - -postgres -D ./data/ -k . diff --git a/scratch/psql/test.replicant.sh b/scratch/psql/test.replicant.sh deleted file mode 100755 index 77499cf..0000000 --- a/scratch/psql/test.replicant.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env sh -# demonstrate how to run the replicant - -echo '{"table": "films"}' | ./replicant.py 2>/dev/null - -# in future, this will probably be: -#./replicant.py "films" 2>/dev/null diff --git a/scratch/remoting/socks5.ws/README.md b/scratch/remoting/socks5.ws/README.md new file mode 100644 index 0000000..9b1eb13 --- /dev/null +++ b/scratch/remoting/socks5.ws/README.md @@ -0,0 +1,49 @@ +SOCKS5 in WebSockets +==================== + + +SOCKS5 is a dead simple little protocol that makes very thin TCP and UDP proxies. +SOCKS is notable because it allows proxying. Since this + +This code implements it on top of WebSockets. If you point [websockify](https://github.com/kanaka/websockify/) at a SOCKS proxy + + +Demo +---- + + +Run a SOCKS server. The quickest is: +``` +$ ssh -D 7777 localhost +``` + +Run websockify in front of that SOCKS server +``` +$ websockify 8081 localhost:7777 +``` + +Run the socks5.js client: +``` +$ node test.socks5.js google.com:80 +``` + +You probably need to `npm install ws` and `npm install ayepromise` first. + +This gets more interesting if instead of sshing in the first step to localhost, you ssh somewhere you have a shell account. +Then the demo script will appear to google (or whoever you hit) to be coming from the system you have a shell account on. + +The same library also works in your browser! + +SOCKS software +-------------- + +Servers: + +* [ssh](http://www.openssh.com/) has a `-D` switch which makes your local machine into a SOCKS proxy, tunneling through to the other end of your ssh session. By default it only allows connections. +* [dante](http://www.inet.no/dante/) is a little more fully featured SOCKS system. +* [tor](http://torproject.org) relies totally on SOCKS to move your traffic off your computer and into the TOR mixnet. + +Clients: + +* Most browsers support SOCKS5. Look under Network Settings in Firefox. +* [tsocks](http://tsocks.sourceforge.net/) which was forked to [torsocks](https://code.google.com/p/torsocks/) which wraps any Unix program through a SOCKS proxy. diff --git a/scratch/remoting/socks5.ws/RFC 1928 - SOCKS Protocol Version 5.htm b/scratch/remoting/socks5.ws/RFC 1928 - SOCKS Protocol Version 5.htm new file mode 100644 index 0000000..0e81d26 --- /dev/null +++ b/scratch/remoting/socks5.ws/RFC 1928 - SOCKS Protocol Version 5.htm @@ -0,0 +1,660 @@ + + + + + + + + + + + + + + + + RFC 1928 - SOCKS Protocol Version 5 + + + + + + + + +
+
+ +
+[Docs] [txt|pdf] [draft-ietf-aft-so...] [Diff1] [Diff2] [Errata]
+
+ PROPOSED STANDARD
+ Errata Exist
+
+Network Working Group                                           M. Leech
+Request for Comments: 1928                    Bell-Northern Research Ltd
+Category: Standards Track                                       M. Ganis
+                                         International Business Machines
+                                                                  Y. Lee
+                                                  NEC Systems Laboratory
+                                                                R. Kuris
+                                                       Unify Corporation
+                                                               D. Koblas
+                                                  Independent Consultant
+                                                                L. Jones
+                                                 Hewlett-Packard Company
+                                                              March 1996
+
+
+                        SOCKS Protocol Version 5
+
+Status of this Memo
+
+   This document specifies an Internet standards track protocol for the
+   Internet community, and requests discussion and suggestions for
+   improvements.  Please refer to the current edition of the "Internet
+   Official Protocol Standards" (STD 1) for the standardization state
+   and status of this protocol.  Distribution of this memo is unlimited.
+
+Acknowledgments
+
+   This memo describes a protocol that is an evolution of the previous
+   version of the protocol, version 4 [1]. This new protocol stems from
+   active discussions and prototype implementations.  The key
+   contributors are: Marcus Leech: Bell-Northern Research, David Koblas:
+   Independent Consultant, Ying-Da Lee: NEC Systems Laboratory, LaMont
+   Jones: Hewlett-Packard Company, Ron Kuris: Unify Corporation, Matt
+   Ganis: International Business Machines.
+
+1.  Introduction
+
+   The use of network firewalls, systems that effectively isolate an
+   organizations internal network structure from an exterior network,
+   such as the INTERNET is becoming increasingly popular.  These
+   firewall systems typically act as application-layer gateways between
+   networks, usually offering controlled TELNET, FTP, and SMTP access.
+   With the emergence of more sophisticated application layer protocols
+   designed to facilitate global information discovery, there exists a
+   need to provide a general framework for these protocols to
+   transparently and securely traverse a firewall.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 1]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   There exists, also, a need for strong authentication of such
+   traversal in as fine-grained a manner as is practical. This
+   requirement stems from the realization that client-server
+   relationships emerge between the networks of various organizations,
+   and that such relationships need to be controlled and often strongly
+   authenticated.
+
+   The protocol described here is designed to provide a framework for
+   client-server applications in both the TCP and UDP domains to
+   conveniently and securely use the services of a network firewall.
+   The protocol is conceptually a "shim-layer" between the application
+   layer and the transport layer, and as such does not provide network-
+   layer gateway services, such as forwarding of ICMP messages.
+
+2.  Existing practice
+
+   There currently exists a protocol, SOCKS Version 4, that provides for
+   unsecured firewall traversal for TCP-based client-server
+   applications, including TELNET, FTP and the popular information-
+   discovery protocols such as HTTP, WAIS and GOPHER.
+
+   This new protocol extends the SOCKS Version 4 model to include UDP,
+   and extends the framework to include provisions for generalized
+   strong authentication schemes, and extends the addressing scheme to
+   encompass domain-name and V6 IP addresses.
+
+   The implementation of the SOCKS protocol typically involves the
+   recompilation or relinking of TCP-based client applications to use
+   the appropriate encapsulation routines in the SOCKS library.
+
+Note:
+
+   Unless otherwise noted, the decimal numbers appearing in packet-
+   format diagrams represent the length of the corresponding field, in
+   octets.  Where a given octet must take on a specific value, the
+   syntax X'hh' is used to denote the value of the single octet in that
+   field. When the word 'Variable' is used, it indicates that the
+   corresponding field has a variable length defined either by an
+   associated (one or two octet) length field, or by a data type field.
+
+3.  Procedure for TCP-based clients
+
+   When a TCP-based client wishes to establish a connection to an object
+   that is reachable only via a firewall (such determination is left up
+   to the implementation), it must open a TCP connection to the
+   appropriate SOCKS port on the SOCKS server system.  The SOCKS service
+   is conventionally located on TCP port 1080.  If the connection
+   request succeeds, the client enters a negotiation for the
+
+
+
+Leech, et al                Standards Track                     [Page 2]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   authentication method to be used, authenticates with the chosen
+   method, then sends a relay request.  The SOCKS server evaluates the
+   request, and either establishes the appropriate connection or denies
+   it.
+
+   Unless otherwise noted, the decimal numbers appearing in packet-
+   format diagrams represent the length of the corresponding field, in
+   octets.  Where a given octet must take on a specific value, the
+   syntax X'hh' is used to denote the value of the single octet in that
+   field. When the word 'Variable' is used, it indicates that the
+   corresponding field has a variable length defined either by an
+   associated (one or two octet) length field, or by a data type field.
+
+   The client connects to the server, and sends a version
+   identifier/method selection message:
+
+                   +----+----------+----------+
+                   |VER | NMETHODS | METHODS  |
+                   +----+----------+----------+
+                   | 1  |    1     | 1 to 255 |
+                   +----+----------+----------+
+
+   The VER field is set to X'05' for this version of the protocol.  The
+   NMETHODS field contains the number of method identifier octets that
+   appear in the METHODS field.
+
+   The server selects from one of the methods given in METHODS, and
+   sends a METHOD selection message:
+
+                         +----+--------+
+                         |VER | METHOD |
+                         +----+--------+
+                         | 1  |   1    |
+                         +----+--------+
+
+   If the selected METHOD is X'FF', none of the methods listed by the
+   client are acceptable, and the client MUST close the connection.
+
+   The values currently defined for METHOD are:
+
+          o  X'00' NO AUTHENTICATION REQUIRED
+          o  X'01' GSSAPI
+          o  X'02' USERNAME/PASSWORD
+          o  X'03' to X'7F' IANA ASSIGNED
+          o  X'80' to X'FE' RESERVED FOR PRIVATE METHODS
+          o  X'FF' NO ACCEPTABLE METHODS
+
+   The client and server then enter a method-specific sub-negotiation.
+
+
+
+Leech, et al                Standards Track                     [Page 3]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   Descriptions of the method-dependent sub-negotiations appear in
+   separate memos.
+
+   Developers of new METHOD support for this protocol should contact
+   IANA for a METHOD number.  The ASSIGNED NUMBERS document should be
+   referred to for a current list of METHOD numbers and their
+   corresponding protocols.
+
+   Compliant implementations MUST support GSSAPI and SHOULD support
+   USERNAME/PASSWORD authentication methods.
+
+4.  Requests
+
+   Once the method-dependent subnegotiation has completed, the client
+   sends the request details.  If the negotiated method includes
+   encapsulation for purposes of integrity checking and/or
+   confidentiality, these requests MUST be encapsulated in the method-
+   dependent encapsulation.
+
+   The SOCKS request is formed as follows:
+
+        +----+-----+-------+------+----------+----------+
+        |VER | CMD |  RSV  | ATYP | DST.ADDR | DST.PORT |
+        +----+-----+-------+------+----------+----------+
+        | 1  |  1  | X'00' |  1   | Variable |    2     |
+        +----+-----+-------+------+----------+----------+
+
+     Where:
+
+          o  VER    protocol version: X'05'
+          o  CMD
+             o  CONNECT X'01'
+             o  BIND X'02'
+             o  UDP ASSOCIATE X'03'
+          o  RSV    RESERVED
+          o  ATYP   address type of following address
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  DST.ADDR       desired destination address
+          o  DST.PORT desired destination port in network octet
+             order
+
+   The SOCKS server will typically evaluate the request based on source
+   and destination addresses, and return one or more reply messages, as
+   appropriate for the request type.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 4]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+5.  Addressing
+
+   In an address field (DST.ADDR, BND.ADDR), the ATYP field specifies
+   the type of address contained within the field:
+
+          o  X'01'
+
+   the address is a version-4 IP address, with a length of 4 octets
+
+          o  X'03'
+
+   the address field contains a fully-qualified domain name.  The first
+   octet of the address field contains the number of octets of name that
+   follow, there is no terminating NUL octet.
+
+          o  X'04'
+
+   the address is a version-6 IP address, with a length of 16 octets.
+
+6.  Replies
+
+   The SOCKS request information is sent by the client as soon as it has
+   established a connection to the SOCKS server, and completed the
+   authentication negotiations.  The server evaluates the request, and
+   returns a reply formed as follows:
+
+        +----+-----+-------+------+----------+----------+
+        |VER | REP |  RSV  | ATYP | BND.ADDR | BND.PORT |
+        +----+-----+-------+------+----------+----------+
+        | 1  |  1  | X'00' |  1   | Variable |    2     |
+        +----+-----+-------+------+----------+----------+
+
+     Where:
+
+          o  VER    protocol version: X'05'
+          o  REP    Reply field:
+             o  X'00' succeeded
+             o  X'01' general SOCKS server failure
+             o  X'02' connection not allowed by ruleset
+             o  X'03' Network unreachable
+             o  X'04' Host unreachable
+             o  X'05' Connection refused
+             o  X'06' TTL expired
+             o  X'07' Command not supported
+             o  X'08' Address type not supported
+             o  X'09' to X'FF' unassigned
+          o  RSV    RESERVED
+          o  ATYP   address type of following address
+
+
+
+Leech, et al                Standards Track                     [Page 5]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  BND.ADDR       server bound address
+          o  BND.PORT       server bound port in network octet order
+
+   Fields marked RESERVED (RSV) must be set to X'00'.
+
+   If the chosen method includes encapsulation for purposes of
+   authentication, integrity and/or confidentiality, the replies are
+   encapsulated in the method-dependent encapsulation.
+
+CONNECT
+
+   In the reply to a CONNECT, BND.PORT contains the port number that the
+   server assigned to connect to the target host, while BND.ADDR
+   contains the associated IP address.  The supplied BND.ADDR is often
+   different from the IP address that the client uses to reach the SOCKS
+   server, since such servers are often multi-homed.  It is expected
+   that the SOCKS server will use DST.ADDR and DST.PORT, and the
+   client-side source address and port in evaluating the CONNECT
+   request.
+
+BIND
+
+   The BIND request is used in protocols which require the client to
+   accept connections from the server.  FTP is a well-known example,
+   which uses the primary client-to-server connection for commands and
+   status reports, but may use a server-to-client connection for
+   transferring data on demand (e.g. LS, GET, PUT).
+
+   It is expected that the client side of an application protocol will
+   use the BIND request only to establish secondary connections after a
+   primary connection is established using CONNECT.  In is expected that
+   a SOCKS server will use DST.ADDR and DST.PORT in evaluating the BIND
+   request.
+
+   Two replies are sent from the SOCKS server to the client during a
+   BIND operation.  The first is sent after the server creates and binds
+   a new socket.  The BND.PORT field contains the port number that the
+   SOCKS server assigned to listen for an incoming connection.  The
+   BND.ADDR field contains the associated IP address.  The client will
+   typically use these pieces of information to notify (via the primary
+   or control connection) the application server of the rendezvous
+   address.  The second reply occurs only after the anticipated incoming
+   connection succeeds or fails.
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 6]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   In the second reply, the BND.PORT and BND.ADDR fields contain the
+   address and port number of the connecting host.
+
+UDP ASSOCIATE
+
+   The UDP ASSOCIATE request is used to establish an association within
+   the UDP relay process to handle UDP datagrams.  The DST.ADDR and
+   DST.PORT fields contain the address and port that the client expects
+   to use to send UDP datagrams on for the association.  The server MAY
+   use this information to limit access to the association.  If the
+   client is not in possesion of the information at the time of the UDP
+   ASSOCIATE, the client MUST use a port number and address of all
+   zeros.
+
+   A UDP association terminates when the TCP connection that the UDP
+   ASSOCIATE request arrived on terminates.
+
+   In the reply to a UDP ASSOCIATE request, the BND.PORT and BND.ADDR
+   fields indicate the port number/address where the client MUST send
+   UDP request messages to be relayed.
+
+Reply Processing
+
+   When a reply (REP value other than X'00') indicates a failure, the
+   SOCKS server MUST terminate the TCP connection shortly after sending
+   the reply.  This must be no more than 10 seconds after detecting the
+   condition that caused a failure.
+
+   If the reply code (REP value of X'00') indicates a success, and the
+   request was either a BIND or a CONNECT, the client may now start
+   passing data.  If the selected authentication method supports
+   encapsulation for the purposes of integrity, authentication and/or
+   confidentiality, the data are encapsulated using the method-dependent
+   encapsulation.  Similarly, when data arrives at the SOCKS server for
+   the client, the server MUST encapsulate the data as appropriate for
+   the authentication method in use.
+
+7.  Procedure for UDP-based clients
+
+   A UDP-based client MUST send its datagrams to the UDP relay server at
+   the UDP port indicated by BND.PORT in the reply to the UDP ASSOCIATE
+   request.  If the selected authentication method provides
+   encapsulation for the purposes of authenticity, integrity, and/or
+   confidentiality, the datagram MUST be encapsulated using the
+   appropriate encapsulation.  Each UDP datagram carries a UDP request
+   header with it:
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 7]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+      +----+------+------+----------+----------+----------+
+      |RSV | FRAG | ATYP | DST.ADDR | DST.PORT |   DATA   |
+      +----+------+------+----------+----------+----------+
+      | 2  |  1   |  1   | Variable |    2     | Variable |
+      +----+------+------+----------+----------+----------+
+
+     The fields in the UDP request header are:
+
+          o  RSV  Reserved X'0000'
+          o  FRAG    Current fragment number
+          o  ATYP    address type of following addresses:
+             o  IP V4 address: X'01'
+             o  DOMAINNAME: X'03'
+             o  IP V6 address: X'04'
+          o  DST.ADDR       desired destination address
+          o  DST.PORT       desired destination port
+          o  DATA     user data
+
+   When a UDP relay server decides to relay a UDP datagram, it does so
+   silently, without any notification to the requesting client.
+   Similarly, it will drop datagrams it cannot or will not relay.  When
+   a UDP relay server receives a reply datagram from a remote host, it
+   MUST encapsulate that datagram using the above UDP request header,
+   and any authentication-method-dependent encapsulation.
+
+   The UDP relay server MUST acquire from the SOCKS server the expected
+   IP address of the client that will send datagrams to the BND.PORT
+   given in the reply to UDP ASSOCIATE.  It MUST drop any datagrams
+   arriving from any source IP address other than the one recorded for
+   the particular association.
+
+   The FRAG field indicates whether or not this datagram is one of a
+   number of fragments.  If implemented, the high-order bit indicates
+   end-of-fragment sequence, while a value of X'00' indicates that this
+   datagram is standalone.  Values between 1 and 127 indicate the
+   fragment position within a fragment sequence.  Each receiver will
+   have a REASSEMBLY QUEUE and a REASSEMBLY TIMER associated with these
+   fragments.  The reassembly queue must be reinitialized and the
+   associated fragments abandoned whenever the REASSEMBLY TIMER expires,
+   or a new datagram arrives carrying a FRAG field whose value is less
+   than the highest FRAG value processed for this fragment sequence.
+   The reassembly timer MUST be no less than 5 seconds.  It is
+   recommended that fragmentation be avoided by applications wherever
+   possible.
+
+   Implementation of fragmentation is optional; an implementation that
+   does not support fragmentation MUST drop any datagram whose FRAG
+   field is other than X'00'.
+
+
+
+Leech, et al                Standards Track                     [Page 8]
+

+RFC 1928                SOCKS Protocol Version 5              March 1996
+
+
+   The programming interface for a SOCKS-aware UDP MUST report an
+   available buffer space for UDP datagrams that is smaller than the
+   actual space provided by the operating system:
+
+          o  if ATYP is X'01' - 10+method_dependent octets smaller
+          o  if ATYP is X'03' - 262+method_dependent octets smaller
+          o  if ATYP is X'04' - 20+method_dependent octets smaller
+
+8.  Security Considerations
+
+   This document describes a protocol for the application-layer
+   traversal of IP network firewalls.  The security of such traversal is
+   highly dependent on the particular authentication and encapsulation
+   methods provided in a particular implementation, and selected during
+   negotiation between SOCKS client and SOCKS server.
+
+   Careful consideration should be given by the administrator to the
+   selection of authentication methods.
+
+9.  References
+
+   [1] Koblas, D., "SOCKS", Proceedings: 1992 Usenix Security Symposium.
+
+Author's Address
+
+       Marcus Leech
+       Bell-Northern Research Ltd
+       P.O. Box 3511, Stn. C,
+       Ottawa, ON
+       CANADA K1Y 4H7
+
+       Phone: (613) 763-9145
+       EMail: mleech@bnr.ca
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+Leech, et al                Standards Track                     [Page 9]
+
+

+Html markup produced by rfcmarkup 1.108, available from +http://tools.ietf.org/tools/rfcmarkup/ + + diff --git a/scratch/remoting/socks5.ws/socks5.js b/scratch/remoting/socks5.ws/socks5.js new file mode 100644 index 0000000..f3558bc --- /dev/null +++ b/scratch/remoting/socks5.ws/socks5.js @@ -0,0 +1,408 @@ +/* socks5.js + * + * Partially implements the SOCKS5 protocol over WebSockets + * The missing parts are around the types of authentication: + * does NOT support GSSAPI, [and has no means for plugging in new ]. + * + * Depends on WebSocketStream (which depends on ayepromise and some WebSocket library). + */ + +/* TODO + * + * + * [ ] We *should* be sending binary frames (i.e. have websocket frame opcode 0x2 i.e. use Blob()s) + * The SOCKS headers are binary, except for the DOMAINNAME address type, which is presumably ASCII. But after the SOCKS headers, we should get out of the way and use whatever form the user or remote end hands us (getting this right will mostly be an exercise in getting unit tests right) + * [ ] We should be able to parse and unparse IP addresses + since SOCKS transfers them in plain bytes, and the + header we send depends on the format we send the address in. + * [ ] Instead of using Strings everywhere, use integer enum codes and have a single function that does conversion between ints and (network ordered!) byte strings + * [ ] factor the common parts of the protocol in some way that allows implementing all three kinds of APIs is feasible + * [ ] unit tests!! + */ + +// UMD header +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory(); + } else { + root.SOCKS5 = factory(); + } +}(this, function () { + 'use strict'; + +if(!WebSocketStream) { + var WebSocketStream = require("./websocketstream.js") +} + +/* Utility Routines */ + +function split_addr(addr) { + // I would rather use the more general 'rsplit()' + // but js doesn't have that and writing it myself is fraught: + //http://stackoverflow.com/questions/958908/how-do-you-reverse-a-string-in-place-in-javascript/16776621#16776621 + + var split = addr.lastIndexOf(":") + var host = addr.slice(0, split) + var port = addr.slice(split+1) + return [host, port] +} + +function join_addr(host, port) { + return host + ":" + port; +} + + +function build_int(n) { + if(!(0 <= n && n<=0xFF)) throw "Integer out of range. SOCKS integers must fit in one byte" + return String.fromCharCode(n); +} + + +function SOCKS5(proxy, target, user, pass) { + /* implements the SOCKS5 client protocol + * reference: https://tools.ietf.org/html/rfc1928 + * + * This class only implements TCP CONNECT SOCKS; SOCKS also allows UDP and even BIND modes, but those types require distinctly different APIs (UDP needs .send() and .onmessage; BIND only allows one remote TCP connection (i.e. it's not a full listen()+accept() implementation) but presumably there should be an intermediate state for "connected to the proxy but no one is connected to us" + + */ + + //A) initializations + + var self = this; + self.target = target + self.remote = { host: null, port: null } //the address of the remote end of the tunnel + + self._ws = new WebSocketStream(proxy) + + self._ws.onopen = function() { + self._connect() + } + + self._ws.onclose = function() { + self.onclose() + } + self._ws.onerror = function() { + self.onerror() + } +} + +// make SOCKS5 inherit from WebSocketStream +// trick from http://ncombo.wordpress.com/2013/07/11/javascript-inheritance-done-right/ +//SOCKS5.prototype = Object.create(WebSocketStream.prototype); + + +SOCKS5.prototype._validate_version = function(b) { + if(b != this.VERSION) { + throw "Unsupported SOCKS version" + } +} + + +// B) connect to the SOCKS server + +SOCKS5.prototype._connect = function() { + var self = this; + return this._negotiate_method() + .then(function(m) { return self._negotiate_auth(m) }) //the wrapping is because then() suffers from changing what 'this' is + .then(function() { return self._negotiate_connection() } ) + .then(function() { return self.onopen({/*XXX fill me in*/}) }) + .fail(function(e) { self.onerror(e) }) //chain exceptions out to the event handler +} + + + +// 1) negotiate an connection method (i.e. an auth method, though encryption and digital signatures are theoretically an option here too); + +SOCKS5.prototype._negotiate_method = function() { + this._ws.send(this._build_method_selection([this.auth.NONE])) + + return this._read_method(); +} + + + +SOCKS5.prototype._build_method_selection = function(methods) { + // precondition: methods is a subset of this.auth + // XXX this precondition isn't enforced! + + var nmethods = methods.length; + + return this.VERSION + build_int(nmethods) + methods.join('') +} + + +SOCKS5.prototype._read_method = function() { + var ws = this._ws; + var self = this; + return ws.recv(1) + .then(function(b) { self._validate_version(b) }) + .then(function() { return ws.recv(1); }) +} + + +// 2) negotiate the authentication; +// the spec says we "MUST" support GSSAPI, which I'm not going +// to do, and "SHOULD" support user/pass, which I am. + + +SOCKS5.prototype._negotiate_auth = function(method) { + var self = this; + // look up a handler for 'method' + // points: this handler may return a promise, but it also might not + // this handler is run with this = [the SOCKS5] + if(method == this.auth.UNACCEPTABLE) { + + //"If the selected METHOD is X'FF', none of the methods listed by the + // client are acceptable, and the client MUST close the connection." + this._ws.close(); //"client MUST close" + + //and we error out too, for good measure + throw "SOCKS server rejected all our auth methods." + } + + function find_method() { + var s = null; + // .find() didn't work. for(k in self.auth) didn't work. + // maybe things are different under Firefox?? + // So I fall back to using a global 's' + Object.keys(self.auth).forEach(function(k) { //XXX this code feels like it probably exists in the stdlib somewhere + if(self.auth[k].charCodeAt(0) == method.charCodeAt(0)) s = k; + }) + + if(s) return s; + throw "Unknown auth method" //Shouldn't happen but not impossible; a conforming server should only respond with a method in the list we sent + } + + var handler = this.authmethods[find_method()] + + // Note! handler might here overwrite this._ws here with a further + // wrapper because: + // > If the negotiated method includes encapsulation [...] + // > these requests MUST be encapsulated in the method- + // > dependent encapsulation. + // - + return handler.call(this) +} + + +SOCKS5.prototype.authmethods = {} +SOCKS5.prototype.authmethods.NONE = function() { + return; +} + +SOCKS5.prototype.authmethods.GSSAPI = function() { + throw "NotImplemented" +} +SOCKS5.prototype.authmethods.LOGIN = function() { + throw "NotImplemented" + //this.send(....) + //return ws.read(loginresponselength).then() .... +} + + +// 3) request the actual tunnel +SOCKS5.prototype._negotiate_connection = function() { + + this._ws.send(this._build_request("CONNECT", this.target)) //hardcoded to "CONNECT"; see the comments near the top + + return this._read_reply(); +} + + +SOCKS5.prototype._build_request = function(command, address) { + command = command.toUpperCase(); + + command = this.commands[command] + if(command === undefined) { + throw "Invalid SOCKS command." + } + + // XXX for now, address is hardcoded as DOMAINAME + // most DNS resolvers should be able to handle text-formatted IPv4 and IPv6 addresses... + var atype = this.atype.DOMAINNAME + + address = split_addr(address) + var host = address[0] + var port = address[1] + + // format address as a fortran-style string + if(host.length > 0xFF) { + throw "Target hostname too long to encode." + } + var n = String.fromCharCode(host.length) + host = n + host + + // and finally, the port + if(port === null) { //XXX maybe this should be inside of spit_addr + throw "Target port must be specified when using SOCKS5." + } + + // "in network octet order" + port = +port; //convert to an integer + port = String.fromCharCode((port & 0xFF00) >> 8) + String.fromCharCode(port & 0xFF) + + + var m = this.VERSION + command + this.RSV + atype + host + port; + return m +} + +SOCKS5.prototype._read_reply = function() { + // As a state machine, this process is: + // [ read version ] -> [ read response ] -> [read reserved null byte] -> [error out] + // |-> [error out] | + // v + // [read address type] + // [read ipv4] [read domainname] [readipv6] + // [read port] + // + // Because the message is a fixed size up to reading the address, I avoiding having to kludge + // around this by just saying .recv(4) and then using standard if statements instead of a chain of .recv(1)s + // but because this step comes after the split, it needs to + + + //the trick here is that promises chain: .then() records the handler you pass it and then returns a new promise which will be fired after that promise completes and finishes the handler + // how do I write branching with promises? + // Promises/A+ makes it easy enough to write a chain of steps and + // get async almost free (the only expense is some repetition: .then().then().then()....) + // MSFT even has an excellent doc on doing this: http://msdn.microsoft.com/en-us/library/windows/apps/Hh700334.aspx + // + // In principle, you should be able to have a promise that represents + // the final result of a branching + // How to express this is escaping me at the moment. + + //NB: the non-lint'd indenting is on purpose here! + // The correct indents would distract, because the .then()s + // are basically boilerplate around the real process. + + var self = this; + var ws = this._ws; + + // check the remote server version + return ws.recv(1) + .then(function(b) { return self._validate_version(b) }) + + // parse the response type + .then(function() { return ws.recv(1) }) + .then(function(b) { + if(b != self.responses.OK) { + throw "SOCKS tunnel refused" + //TODO: give more detailed error message based on what b is + } + }) + + // check that the 'reserved' byte is actually unused; + // if it's not, we might be not talking to SOCKS + .then(function() { return ws.recv(1) }) // NB: Promises/A+ says that you can chain promises: http://promisesaplus.com/#point-49 + .then(function(b) { + if(b != self.RSV ) { + throw "Malformed SOCKS reply" + } + }) + + + // determine the length of the next field, which is "bind.addr", + // telling us what our remote host is + .then(function() { return ws.recv(1) }) + .then(function(b) { + switch(b) { + case self.atype.IPv4: //ipv4: 4 bytes + return ws.recv(4).then(function(addr) { + //TODO: parse the bytes into a IP string + self.remote.host = addr; + }) + break; + case self.atype.DOMAINNAME: // domain name: a fortran-style string (so we need to read 1 byte to find out the length) + return ws.recv(1).then(function(h) { + h = h.charCodeAt(0) //extract the number of bytes to read + ws.recv(h).then(function(addr) { + //the string as given is a string + self.remote.host = addr; + }) + }) + break; + case self.atype.IPv6: //ipv6: 16 bytes + return ws.recv(16).then(function(addr) { + //TODO: parse the octets into a string + self.remote.host = addr; + }) + break; + default: + throw "Received unknown address type"; + } + }) + + // finally, read the port + .then(function() { return ws.recv(2) }) + .then(function(b) { + self.remote.port = b.charCodeAt(0) << 8 | b.charCodeAt(1) + }) + +} + + +// C) Get out of the way: just forward packets +// + +SOCKS5.prototype.recv = function(n) { + // XXX I don't think I'm doing this right + // TODO: ensure that recv() on the outer can be called before the SOCKS negotiation is done + // it should block until + // XXX as written this could interfere with the SOCKS negotiation and totally screw up everything + + return this._ws.recv(n) +} + +SOCKS5.prototype.recvline = function() { + // XXX I don't think I'm doing this right + // TODO: ensure that recv() on the outer can be called before the SOCKS negotiation is done + + return this._ws.recvline() +} + +SOCKS5.prototype.send = function(d) { + return this._ws.send(d) +} + +SOCKS5.prototype.close = function() { + return this._ws.close(); +} + +SOCKS5.prototype.onopen = function(evt) {} +SOCKS5.prototype.onclose = function(evt) {} +SOCKS5.prototype.onerror = function(evt) {} + +// These constants are hardcoded to correspond to their encoding within the protocol +// SOCKS is simple enough that the constants it uses are all single bytes. +SOCKS5.prototype.VERSION = String.fromCharCode(5) // i.e. SOCKS version 5 +SOCKS5.prototype.RSV = String.fromCharCode(0) //RESERVED byte + +SOCKS5.prototype.auth = { + NONE: String.fromCharCode(0), + GSSAPI: String.fromCharCode(1), + LOGIN: String.fromCharCode(2), + UNACCEPTABLE: String.fromCharCode(0xFF), + // all others are reserved either by IANA or for custom use + // i.e. probably no one uses them(?) + } + + +SOCKS5.prototype.commands = {CONNECT: String.fromCharCode(1), + BIND: String.fromCharCode(2), + UDP: String.fromCharCode(3)} + +SOCKS5.prototype.atype = { + IPv4: String.fromCharCode(1), + DOMAINNAME: String.fromCharCode(3), + IPv6: String.fromCharCode(4) + } + + +SOCKS5.prototype.responses = {OK: String.fromCharCode(0), + //TODO: there's 8 possible errors + } + + +return SOCKS5; +})); \ No newline at end of file diff --git a/scratch/remoting/socks5.ws/test.socks5.js b/scratch/remoting/socks5.ws/test.socks5.js new file mode 100644 index 0000000..8410a08 --- /dev/null +++ b/scratch/remoting/socks5.ws/test.socks5.js @@ -0,0 +1,50 @@ + +var SOCKS5 = require("./socks5.js") + + + +process.argv.shift()//this removes "/usr/bin/node" and should be a standard part of the startup; it is for python + + +var target = process.argv[1] +if(!target) { + target = "uwaterloo.ca:80" +} + +var prx = new SOCKS5("ws://localhost:8081", target) + +prx.onopen = function() { + console.log("Requesting HTTP from ", target) + prx.send("GET / HTTP/1.1\r\n\r\n") + + prx.recv().then(function(e) { + console.log("Response:") + console.log(e); + }) +} + +prx.onerror = function(e) { + console.log("It blew up!") + console.log(e) +} + + +setTimeout(function() { + console.log("timeout") + prx.close(); +}, 15*1000) +/* +or proxy via + +var prx = new SOCKS5("ws://proxy-host.com:3535", "tor.kousu.ca:22") + +and prx is simply + +// or with our (currently imaginary) SSH client library: +var ssh = new SSH(prx) +ssh.onconnect = function(e) { + ssh.login("user", "pass") +} +ssh.onlogin = function(e* + +*/ diff --git a/scratch/remoting/socks5.ws/test.websocketstream.js b/scratch/remoting/socks5.ws/test.websocketstream.js new file mode 100644 index 0000000..a071817 --- /dev/null +++ b/scratch/remoting/socks5.ws/test.websocketstream.js @@ -0,0 +1,11 @@ + +// run websockify 8081 --unix-target=/tmp/ab & nc -vUl /tmp/ab to construct a server +var WebSocketStream = require("./websocketstream.js") +var ws = new WebSocketStream("ws://localhost:8081") +ws.onclose = function() { console.log("closed") } + +ws.recv(6).then(function(d) { + // NOTE: WebSocketStream by default passes text + // I am unsure if . It is probably up to the websocket library. + console.log("read these 6 characters: ", d) +}) diff --git a/scratch/remoting/socks5.ws/test.websocketstream.socks5.js b/scratch/remoting/socks5.ws/test.websocketstream.socks5.js new file mode 100644 index 0000000..36d7c6a --- /dev/null +++ b/scratch/remoting/socks5.ws/test.websocketstream.socks5.js @@ -0,0 +1,27 @@ + +// run websockify 8081 --unix-target=/tmp/ab & nc -vUl /tmp/ab to construct a server +var WebSocketStream = require("./websocketstream.js") +var ws = new WebSocketStream("ws://localhost:8081") +ws.onclose = function() { console.log("closed") } + +ws.onopen = function() { + +ws.send("\x05\x01\x00") +ws.recv(2).then(function(d) { + // NOTE: WebSocketStream by default passes text + // I am unsure if . It is probably up to the websocket library. + console.log("read these: ", d.charCodeAt(0), d.charCodeAt(1)) +}).then(function() { + ws.send("\x05\x01\x00\x03\x0Cuwaterloo.ca\x00\x50") +}).then(function() { return ws.recv(4) }) +.then(function(d) { + console.log("read these: ", d.charCodeAt(0), d.charCodeAt(1), d.charCodeAt(2), d.charCodeAt(3)) + ws.send("GET / HTTP/1.1\r\n\r\n") +}).then(function() { + return ws.recv() +}).then(function(page) { + console.log(page) +}) + + +} diff --git a/scratch/remoting/socks5.ws/websocketstream.js b/scratch/remoting/socks5.ws/websocketstream.js new file mode 100644 index 0000000..5011b96 --- /dev/null +++ b/scratch/remoting/socks5.ws/websocketstream.js @@ -0,0 +1,191 @@ +// UMD header +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory(); + } else { + root.WebSocketStream = factory(); + } +}(this, function () { +'use strict'; + +if(!WebSocket) { + var WebSocket = require("ws"); +} +if(!ayepromise) { + var ayepromise = require("ayepromise") +} + +/* +this class handles the buffering that WebSocket doesn't +providing the recv() method familiar from synchronous socket code. +Since it returns promises that conform to Promises/A+, the resulting code +is very nearly the same as the equivalent synchronous code. +[[aside: promises are continuation-passing-style indirected]] + +It has the same (XXX not finished) API as WebSocket, except that + .onmessage = handler is replaced by .recv(n, handler) and .recvline(handler) +TODO: is there any way to achieve this with inheritence? can I inherit and somehow zero out onmessage externally while still using onmessage internally? + +.recvline() blocks until '\n' comes in +.recv(n) blocks until n bytes are ready +.recv() buffer until websocket closes +NOTE: only one of these can be active at a time; if you try to call recv() while a recv() is pending you will get an exception + +*/ + +/* TODO: + * + * [ ] do we want perhaps to add a nonblocking flag so that recv() is EITHER blocking or immediate polling? but the only way to deal with nonblocking sockets is polling, and we probably want to discourage polling in javascript... + + * [ ] handle the different modes you can open a websocket in: strings, base64, and binary; + The websocket protocol technically allows intermixing text and binary (and base64 is a websockify addition) frame by frame. Internally, this means we need to our buffer to be tolerate receiving in all three modes. + Sending is not as much of a problem, because we just relay that down to WebSocket itself, and it knows how to handle the differences, though perhaps a base64 mode would help. + + * [x] WebSockify, which I'm testing against (and which this is mostly useful with) insists on me specifying "binary" or "base64" in the protocols argument. + [ ] Make sure upstream accepts my patch + [ ] Support the "protocols" option somehow. +*/ + + +// XXX a complication: +// +// with 'binary' e.data ends up as a Blob which requires a whole slew of JS objects to manipulate: FileReader and ArrayBuffer. FileReader.readAsBinaryString + FileReader.result is the shortest way to just getting the bytes out, however readAsBinaryString has been deprecated for 2 years. +// readAsText assumes the data is UTF-8 (or otherwise specified) text and will decode as approproate +// readAsArrayBuffer just pushes the trouble of manipulation to learning the ArrayBuffer class +// +// For development, I'm going to force base64. I don't know if non-WebSockify WebSocketServers will understand it, but I can cross that bridge later. + +function WebSocketStream(addr) { + var self = this; //so that we can refer to the WebSocketStream from the WebSocket's event handlers + + this._buffer = ""; //TODO: use something more efficient than strings here ( see https://github.com/phoboslab/jsmpeg/blob/master/jsmpg.js#L90 for how ) + + this._pending = null; + + this._ws = new WebSocket(addr, "binary"); + + this._ws.onmessage = function(e) { + self._pushbuffer(e.data); + } + + // these handlers which proxy need to be defined in here + // in order to pick up 'self' and not cause infinite recurision + // saying .e.g this._ws.onopen = this._onopen does the wrong thing, because js doesn't care that this._onopen comes from this, it runs it with this=this._ws + this._ws.onopen = function(e) { + if(self.onopen) { + self.onopen(e) + } + } + this._ws.onclose = function(e) { + // clear out a pending .recv() + if(self._pending !== null) { + if(self._pending.type == self._RECV) { + self._pending.deferred.resolve(self._buffer) + } + } + if(self.onclose) { + self.onclose(e) + } + } + this._ws.onerror = function(e) { + if(self.onerror) { + self.onerror(e) + } + } + + +} + +WebSocketStream.prototype._pushbuffer = function(d) { + var self = this; + + self._buffer += d; + //console.log("[",this._buffer.length,"] buffer = ", self._buffer) //DEBUG + + // if we have a recv() or a recvline() blocked, scan + if(self._pending !== null) { + var split = -1; // if not -1, determines how many bytes to eat and call handler with, and causing the pending recv to be completed + + if(self._pending.type == self._RECVn) { + if(self._buffer.length >= self._pending.n) { + split = self._pending.n; + } + } else if(self._pending.type == self._RECVLINE) { + split = self._buffer.indexOf("\n") + } + + if(split != -1) { + var d = self._buffer.slice(0, split) + self._buffer = self._buffer.slice(split) + + var p = self._pending; + // "complete" the pend; i.e. null out _pending + self._pending = null; //it's important to complete *before* resolving, + //since the resolution handler might--is likely to, even--call recv() again + p.deferred.resolve(d) + } + } +} + + //TODO: .prototoype._recv = { NONE: 0, ..} +WebSocketStream.prototype._RECVNONE = 0 +WebSocketStream.prototype._RECVn = 1 +WebSocketStream.prototype._RECVLINE = 2 +WebSocketStream.prototype._RECV = 3 //XXX NotImplemented + +WebSocketStream.prototype.send = function(data) { + + if(typeof(data) === "string") { + //Bad Things Happen if we let the websocket library send the data, + // namely it UTF-8 encodes it + //XXX this is wrong! the client code should have control here. + // Maybe it really does want to send UTF8?? + var _data = data; + data = new Uint8Array(_data.length); + for(var i = 0; i/dev/null; HERE=`pwd`; popd >/dev/null + +source $HERE/pg_vars.sh + +psql diff --git a/scratch/psql/dump.sh b/src/backend/db/dump.sh similarity index 79% rename from scratch/psql/dump.sh rename to src/backend/db/dump.sh index 8de921f..c74c5a5 100755 --- a/scratch/psql/dump.sh +++ b/src/backend/db/dump.sh @@ -6,6 +6,6 @@ # pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null -cd $HERE +source $HERE/pg_vars.sh -pg_dump -h $HERE/data/ -d postgres +pg_dump diff --git a/src/backend/db/init.sh b/src/backend/db/init.sh new file mode 100755 index 0000000..d8732cd --- /dev/null +++ b/src/backend/db/init.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# usage: init.sh dbname +# +# constructs a postgres instance, the database "dbname", and installs the replication hooks +# once you run this, run server.sh to use the database + +# TODO: figure out how to chain the steps in this script so that the hooks are not installed if the CREATE DATABASE failed + +pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null +source $HERE/pg_vars.sh + +if [ `uname` = "Darwin" ]; then + # OS X has several possible python distros + # our supported configuration uses Postgres.app, + # which is built linked against the Apple distro of python + # so we need to ensure the Apple python is the one loaded + # or else strange library errors will crop up + # + # Unfortunately, I'm having trouble building psycopg2 against the system python, so we clients (ie replicate.py) has to be run with Anaconda Python + # + # XXX this was copypasted from server.sh for the case where we need to spawn a server + # it would be better if we could instead call server.sh--except we (purposely) don't have daemon control of server.sh + + export PATH=/usr/bin:$PATH +fi + + +if [ ! -d $PGDATA ]; then + initdb +fi + +if pg_ctl status 2>&1 >/dev/null; then + ALREADY_RUNNING=1 +fi + +if [ ! $ALREADY_RUNNING ]; then + pg_ctl -w start -o "-k $PGHOST" +fi + +if [ x"$PGDATABASE" != x"postgres" ]; then #we know postgres makes the "postgres" database by default, so don't try to CREATE it again + psql -d postgres -h $PGDATA </dev/null; HERE=`pwd`; popd >/dev/null + +if [ ! -z $1 ]; then + export PGDATABASE=$1 +fi + +if [ -z $PGDATABASE ]; then + export PGDATABASE="postgres" # initdb makes a default database called "postgres" so we just ride on that as the default +fi + +if [ -z $PGDATA ]; then + export PGDATA=$HERE/data #<-- this construction makes $PGDATA an absolute path +fi + +if [ -z $PGHOST ]; then + export PGHOST=$PGDATA #pqsl takes absolute-path PGHOST to mean "use Unix Domain not TCP" +fi + +# we don't need to set PGPORT, but be aware that if it is set it changes the name of the listening socket file + diff --git a/src/backend/db/replicate.py b/src/backend/db/replicate.py new file mode 100755 index 0000000..7cefdea --- /dev/null +++ b/src/backend/db/replicate.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python +# usage: replicate table --> stdout posts a line-oriented stream of HRDJ +# third-time's-a-charm edition +# depends: sqlalchemy + +# depends: watch.pysql and replicate.pysql are loaded into postgres + +""" +TODO: + irritant: select() blocks ctrl-c, because it's an I/O wait (quick fix: give a long timeout on select and spin it in a polling loop) + +clean up the sql injections + +clean up naming ("my_pg"? really?) + +support views (filtered replication) ---- doing this right requires some finesse. I have the logic written down in replicant.py, but distinguishing each client from each other will be tricky. + +consider whether we can ditch SQLAlchemy; all this script does is issue simple SQL commands. + speaking DBAPI directly would be less heavy + + +unregister is not getting called on ctrl-c or other exceptions, even though I've + --> since the with: is inside of a generator,there's paths through the code which quit without going through the .__exit__(). Very annoying. + ...but it should mostly be blocking in Changes.__next__, so why is this such an obvious problem?? + +Concurrency bugs: + by adding artificial stalls (time.sleep()) to the script, you can demonstrate these to yourself + 1) writes that occur after "cur = " but before "changes = " go to the great bit bucket in the sky + solutions: + a. figure out some way to ask postgres for timeline location id (xid)s and ask it to replay from those certain poitns + b. Use an explicit table lock around acquiring the two cursors (NB: the changes cursor is not a SQL cursor) + - http://www.postgresql.org/docs/current/static/sql-lock.html / http://www.postgresql.org/docs/current/static/explicit-locking.html + ...i think that's the only one. We could get the changes cursor first, and then be in the equally difficult situation of having double-writes (so, a delete would show up as a nonexistent row and then a delete which would get confused, an insert would show up as a duplicated row, an update would show up as both -- or maybe it would show up as unable to be applied since the old row would). If a human was doing this work, these sorts of errors would be manageable, but for a computer this is equally difficult. + but since we make sure to (let the kernel) buffer changes for us simultaneously to the current state being written out, we should at least not miss anything in the gaps of writing out the current state. + +""" + +import sys +import os, tempfile +import socket, select +import signal, threading + +import sqlalchemy, json + + +import logging +logging.getLogger().setLevel(logging.DEBUG) + + +if os.uname().sysname == "Darwin": + # there's deployment issues on OS X: + # http://stackoverflow.com/questions/16407995/psycopg2-image-not-found + # this is an unfinished hack to get around it which might not even work + os.environ["DYLD_LIBRARY_PATH"] = "/Applications/Postgres.app/Contents/Versions/9.3/lib/" + #+ ":" + os.environ.setdefault("DYLD_LIBRARY_PATH","") + try: + import psycopg2 + except ImportError: + sys.stderr.write("Unable to import psycopg2 under OS X. Do you have Postgres.app installed? Do you have DYLD_FALLBACK_LIBRARY_PATH set to point at it?") + # TODO: print traceback here + sys.exit(3) + + + + +def shutdown(): + """ + Tell all the spools in this program that are blocked on select() to stop. + + This should be triggered by + SIGINT + SIGQUIT + SIGTERM + and by stdin or stdout closing + but not by + SIGKILL (which we cannot catch anyway) + SIGSTOP (which we cannot catch anyway) + SIGABRT (which we can catch but means roughly the same as SIGKILL: "die without cleaning up") + and not by stderr + TODO: write tests which cover all these cases + """ + global shutdown_socket + shutdown_socket.close() + +def shutdown_on_signal(SIG, stack_frame): + shutdown() + + +def alivethread(): + """ + poll stdin for EOF, and signal shutdown when that occurs + """ + while True: + select.select([sys.stdin],[],[]) #block + if sys.stdin.read(1) == "": #EOF + break + shutdown() + + + + +class Changes: + MTU = 2048 #maximum bytes to read per message + + def __init__(self, table): #TODO: support where clauses + self._table = table + self._stream_id = None + + def __enter__(self): + # set up our listening socket + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.bind(tempfile.mktemp(prefix="pg_replicate_%s" % (self._table,))) #XXX RACE CONDITION; mktemp.__doc__ specifically says "this function is unsafe"; however, how else to make a socket? + # also using the table name in the file name is probably poor form + logging.info("listening for changes to %s on %s", self._table, self._sock.getsockname()) + + # register ourselves with the source + # XXX SQL injection here + # autocommit is explicitly turned on because SQLAlchemy assumes all selects are mutationless + # the docs expplicitly cover how to workaround this: http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html#understanding-autocommit + C = E.connect() + r = C.execution_options(autocommit=True).execute("select * from my_pg_replicate_register('%s', '%s')" % (self._table, self._sock.getsockname())) + r = r.scalar() + logging.debug("register() result is: %s", r) + C.close() + + self._stream_id = r + + return self #oops! + + def __exit__(self, *args): + + # unregister ourselves + # XXX SQL injection here + C = E.connect() + r = C.execution_options(autocommit=True).execute("select * from my_pg_replicate_unregister('%s')" % (self._stream_id)) + r = r.scalar() + logging.debug("unregister() result is: %s", r) + C.close() + + # shutdown the socket + os.unlink(self._sock.getsockname()) # necessary because we're using unix domain sockets + self._sock.close() + + def __iter__(self): + return self + + def __next__(self): + fread, fwrite, ferr = select.select([self._sock, shutdown_listener], [], [self._sock, shutdown_listener]) #block until some data is available + if ferr: + pass #XXX what should we do in this case? + else: + if shutdown_listener in fread: #check for EOF from shutdown_socket + raise StopIteration + pkt = self._sock.recv(Changes.MTU) + pkt = pkt.decode('utf-8') + return pkt + + next = __next__ #py2 compatibility + + +def replicate(table): + # XXX we might need to construct the Changes stream first + # If we do have concurrency problems, doing that at least guarantees that we don't miss any changes, though we might end up with duplicate rows or trying to delete nonexistent rows + + # 2) get a handle on the change stream beginning at the commit postgres was at *now* (?? maybe this involves locking?) + # XXX we're relying on the time between the select and Changes.__enter__() to be small enough to be atomically + # but that's never absolutely true so this has a race condition! + # It would be better if we could ask postgres + # "what is the lamport clock of our previous select", which postgres internally records [CITE: ....] + # And then say "with Changes(_table,[ columns,][ where,] from=clock)" + # (NB: a lamport clock is a count of events; it has nothing to do with real time except that it always increases in time, making it suitable for synchronizing concurrent processes even when their system clocks might skew) + # but postgres doesn't seem(?) to provide a way to extract; it just uses timelines to make sure every session sees a consistent set of data. + # this is sort of tricky + # I need to say somethign like + + with Changes(table) as changes: #<-- use with to get the benefits of RAII, since Changes has a listening endpoint to worry about cleaning up + + # README: BUGFIX: the change to raw_connection() caused a deadlock which only occurs the first time register() is called: register() needs to create a trigger on _table, but cur holds a lock on _table + # it seems, however, that reordering the instructions avoids the deadlock + # and i was already considering doing this; this order means we potentially have overlapping state in the Changes and cur feeds + + # 1) get a cursor on the current query + + # XXX question: are we allowed to have multiple results open during a single connection? Does this cause deadlocks? This program only uses *one* result set: the Changes feed is not coming from a resultset, it's listening to a socket. + + # get current state with SQLAlchemy #XXX commented out because this was causing a deadlock; TODO: revisit and see if you can avoid using raw_connection() + # stream_results is turned on for this query so that this line takes as little time as possible + #cur = C.execution_options(stream_results=False).execute("select * from %s" % (_table,)) + + + C_DBAPI = E.raw_connection() + cur = C_DBAPI.cursor() + try: + cur.execute("select * from %s" % (table,)) #XXX SQL INJECTION HERE + + # 3) spool out the current state + # --------------------------------------------------- + keys = [col.name for col in cur.description] #low level SQLAlchemy (psycopg2, in this case) + #keys = cur.keys() #SQLAlchemy + for row in cur: + # if we've been told to shutdown + # fail-fast + if select.select([shutdown_listener], [], [],0)[0]: # the ",0" is key here! it means "do polling" instead of "do blocking" + break + + # else + # spool the next row + row = dict(zip(keys, row)) #coerce the SQLAlchemy row format to a dictionary + #convert row to our made up delta format ("HDRJ") + delta = {"+": row} #pretend that the existing rows are actually inserts + delta = json.dumps(delta) #and then to JSON + yield delta + # do I need to explicitly close the cursor? + finally: + cur.close() + C_DBAPI.close() + + # 4) spin, spooling out the change stream + # --------------------------------------------------- + for delta in changes: + # we assume that the source (watch_table()) has already jsonified things for us; THIS MIGHT BE A MISTAKE + yield delta + # NOTREACHED (unless something crashes, the changes feed should be infinite, and a crash would crash before this line anyway) + + + + +def main(): + postgres, table = sys.argv[1:] + + assert postgres.startswith("postgresql://"), "Must be a valid sqlalchemy-to-postgres connection string" #XXX there are connection strings which spec the driver to use which are valid too; this assertion is too strong + #DB_SOCKET = "data" #path to folder containing the postgres socket + #DB_SOCKET = os.path.abspath(DB_SOCKET) #postgres can't handle relative paths + #DB_CONN_STRING = "postgresql:///postgres?host=%s" % (DB_SOCKET,) + global E + E = sqlalchemy.create_engine(postgres) #TODO: deglobalize + + global shutdown_socket, shutdown_listener + shutdown_socket, shutdown_listener = socket.socketpair() #when this socket closes all spools (all two of them, but it could be generalized) should shut down immediately + + for SIG in [signal.SIGTERM, signal.SIGQUIT]: #XXX do I need to set SIGINT here, or does python's default of raising KeyboardInterrupt serve well enough? + signal.signal(SIG, shutdown_on_signal) + + # start up the thread to monitor stdin for EOF (i.e. when a client disconnects) + # we do not need to hold onto this thread because it is a daemon and has no clean up to do + # so we can start it immediately + threading.Thread(target=alivethread, daemon=True).start() + + for delta in replicate(table): + #print(delta, flush=True) #py3 + print (delta); sys.stdout.flush() #py2/3 + + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scratch/psql/replicate.pysql b/src/backend/db/replicate.pysql similarity index 73% rename from scratch/psql/replicate.pysql rename to src/backend/db/replicate.pysql index 4c603f8..b4a6e1d 100644 --- a/scratch/psql/replicate.pysql +++ b/src/backend/db/replicate.pysql @@ -6,6 +6,8 @@ -- TODO: -- [ ] Install these hooks into a spare tablespace -- [ ] Make sure these hooks can be deployed to production servers +-- [ ] Do load tests to make sure these hooks don't lag out the server; actually formatting and compacting the IP packets should perhaps be pushed out to an external process +-- [ ] Fix the SQL Injections (the table ones are a problem; maybe the postgres mailing list will have some ideas; maybe plpy provides a sanitize() function CREATE OR REPLACE LANGUAGE plpython2u; @@ -61,15 +63,28 @@ $$ print("\tto") sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM); for o in clients: - print(o) + print("\t",o) try: event = event.encode("utf-8") plpy.debug("\twriting |%s| to %s" % (event, sock.getsockname())) sock.sendto(event, o['addr']) except Exception as e: - warn("unable to write to client socket %s: %s" % (o['addr'], e)) - pass + warn("\tunable to write to client socket %s: %s" % (o['addr'], e)) + + # Forcibly unregister a miss + # Be aware that this only happens when someone writes to + # so even though this is defensive coding, it does not totally prevent an attacker from eating up resources + # so long as that attacker can cause large numbers of phantom register()s between writes + # BEWARE: if you are trying to debug the sockets, consider commenting this out + # so that you can use netcat to listen at will to the backend sockets + plant = plpy.prepare("select my_pg_replicate_unregister($1)", ['text']) + unreged = plpy.execute(plant, [o['stream_id']]) + unreged = unreged[0].values()[0] #extract scalar return code + if unreged: + warn("\t\tForcibly unregistered %s" % (o['addr'],)) + else: + warn("\t\tUnable to unregister %s" % (o['addr'],)) sock.close() print("") @@ -133,6 +148,11 @@ CREATE TABLE my_pg_replicate_clients (stream_id text PRIMARY KEY, addr text UNIQ select 'installing my_pg_replicate_triggers'; -- a simple data structure to refcount how active clients are watching any table -- when the number of clients drop to zero, we can remove the parasitic trigger +-- Note: in SQL theory, this is a form of denormalization (ie data redundancy), +-- since the values in this table (should be) equal to +-- select _table, count(_table) from my_pg_replicate_clients group by _table; +-- I argue that we want to trade redundancy for speed in this case, since otherwise +-- each unregister() could take a very long time, at least until a significant number of unregister()s happen DROP TABLE IF EXISTS my_pg_replicate_triggers; CREATE TABLE my_pg_replicate_triggers (_table text PRIMARY KEY, uses int); @@ -154,12 +174,15 @@ $$ r = plpy.execute("select _table from my_pg_replicate_triggers where _table = '%s'" % (_table,)) #import IPython; IPython.embed() if r.nrows(): - #XXX SQL injection - plpy.execute("update my_pg_replicate_triggers set uses = uses + 1 where _table = '%s'" % (_table,)) + plant = plpy.prepare("update my_pg_replicate_triggers set uses = uses + 1 where _table = $1", ['text']) + plpy.execute(plant, [_table]) else: # table is currently unwatched; generate a random name and install the trigger # XXX edge case: if we drop the replicate_triggers table while triggers are still active (eg reloading this file) it's possible that the trigger will already be installed # Since the number of times this happens should be small (famous last words..), it won't hurt to just recreate it every time + #XXX SQL INJECTION + # --> there's no way to get around this injection with a prepared statement, since postgres only allows specifying values, not tables, as prepared statement parameters + # plpy.execute("DROP TRIGGER IF EXISTS watch_table on %s" % (_table,)) plpy.execute( """CREATE TRIGGER watch_table @@ -181,10 +204,29 @@ $$ LANGUAGE plpython2u; select 'installing my_pg_replicate_unregister'; CREATE OR REPLACE FUNCTION my_pg_replicate_unregister(stream_id text) RETURNS bool AS $$ + plant = plpy.prepare("select _table from my_pg_replicate_clients where stream_id = $1", ["text"]) + _table = plpy.execute(plant, [stream_id]) + _table = _table[0].values()[0] #scalarify + plant = plpy.prepare("delete from my_pg_replicate_clients where stream_id = $1", ["text"]) plpy.execute(plant, [stream_id]) - return True; #TODO: check if the query did anything (?? does execute() return a list of affected rows or something we can look at?) - # TODO: - # this function should also uninstall triggers if there are no listeners for _table left -$$ LANGUAGE plpython2u; \ No newline at end of file + plant = plpy.prepare("select uses from my_pg_replicate_triggers where _table = $1", ['text']) + uses = plpy.execute(plant, [_table]) + uses = uses[0].values()[0] #scalarify + if uses > 1: + # case 1: simple decrement + plant = plpy.prepare("update my_pg_replicate_triggers set uses = uses - 1 where _table = $1", ['text']) + plpy.execute(plant, [_table]) + else: + # case 2: total removal + # 2a) decrement (which we write as a delete) + plant = plpy.prepare("delete from my_pg_replicate_triggers where _table = $1", ['text']) + plpy.execute(plant, [_table]) + + # 2b) remove the trigger, since there are no more users of it currently + # XXX SQL injection: we cannot use a prepared statement to pass a table :/ + plpy.execute("DROP TRIGGER watch_table on %s" % (_table,)) + + return True; #TODO: check if the stream_id was actually there and was actually deleted +$$ LANGUAGE plpython2u; diff --git a/scratch/psql/replicate.sh b/src/backend/db/replicate.sh similarity index 100% rename from scratch/psql/replicate.sh rename to src/backend/db/replicate.sh diff --git a/scratch/psql/replicate_server.sh b/src/backend/db/replicate_server.sh similarity index 56% rename from scratch/psql/replicate_server.sh rename to src/backend/db/replicate_server.sh index 17f1d41..4823797 100755 --- a/scratch/psql/replicate_server.sh +++ b/src/backend/db/replicate_server.sh @@ -12,19 +12,30 @@ # the multiple-client effect is achieved by socat TCP-LISTEN,...,fork [nc calls this -k] PORT="$1" -TABLE="$2" +DB="$2" +TABLE="$3" -SERVER="./replicate.sh ${TABLE}" +SERVER="./replicate.sh \"${DB}\" \"${TABLE}\"" # rather than use up a TCP port for the websockify <--> socat connection # use a unix domain socket instead # TODO: name this better / use mkstemp / something -IPC=/tmp/s_websockify_replicate_$table +IPC=/tmp/s_websockify_replicate_${TABLE} + + +# terminate all children (websockify, etc) on exit +# tip from http://stackoverflow.com/questions/360201/kill-background-process-when-shell-script-exit +# but note that we trap both "TERM" which is a unix signal and "EXIT" which is a bash-ism that catches normal termination +# also, the alternate form 'kill 0' which sends TERM to *all descendents* (not just immediate ones) +trap 'kill $(jobs -p)' TERM EXIT +#trap 'kill 0' TERM EXIT # start up websockify, daemonized, waiting for connections to proxy to socat -websockify -D $PORT --unix-target=$IPC +websockify $PORT --unix-target=$IPC & #NB: it is important; it might be more robust to just give up on bash and use subprocess.py instead... # start up socat, proxying TCP to the replication server # reusaddr gets around lingering (e.g. TIME_WAIT) connections blocking the new bind(), # so that you can stop and restart this script immediately socat UNIX-LISTEN:$IPC,fork,reuseaddr EXEC:"$SERVER" +# we don't background socat because we need something to hold this script open + diff --git a/src/backend/db/reset.sh b/src/backend/db/reset.sh new file mode 100755 index 0000000..946c3a1 --- /dev/null +++ b/src/backend/db/reset.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +# usage: reset.sh +# +# TODO: if user gives PGDATABASE, only wipe that database(?) +# this script is a solution looking for a problem. + +pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null +source $HERE/pg_vars.sh + +if [ -d $PGDATA ]; then + echo "Removing $PGDATA" + rm -r $PGDATA +fi diff --git a/src/backend/db/server.sh b/src/backend/db/server.sh new file mode 100755 index 0000000..772c919 --- /dev/null +++ b/src/backend/db/server.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# usage: +# [optional] export PGDATA=... # path to where to the postgres backend data files +# [optional] export PGHOST=... #path to where to place the listening socket +# [optional] export PGPORT=11234 +# ./server.sh +# + +# this script lets you run postgres without needing root +# it also handles any ugly platform-specific cruft +# +# before running this run init.sh + +pushd $(dirname $0) >/dev/null; HERE=`pwd`; popd >/dev/null +source $HERE/pg_vars.sh + +if [ `uname` = "Darwin" ]; then + # OS X has several possible python distros + # our supported configuration uses Postgres.app, + # which is built linked against the Apple distro of python + # so we need to ensure the Apple python is the one loaded + # or else strange library errors will crop up + # + # But there's different linking errors on the client side, so + # this line is in server.sh to keep the invasiveness small. + + export PATH=/usr/bin:$PATH +fi + +#'-h ""' disables TCP +#'"-k ."' is the magic that means "put your socket file here and not in /var/run" which you would need to with fight permissions for +postgres -D $PGDATA -h "" -k $PGHOST diff --git a/scratch/psql/films.sql b/src/backend/db/tests/films.sql similarity index 100% rename from scratch/psql/films.sql rename to src/backend/db/tests/films.sql diff --git a/scratch/psql/test.psql b/src/backend/db/tests/test.psql similarity index 78% rename from scratch/psql/test.psql rename to src/backend/db/tests/test.psql index 42ff153..9941a73 100644 --- a/scratch/psql/test.psql +++ b/src/backend/db/tests/test.psql @@ -7,7 +7,11 @@ DROP TABLE IF EXISTS films CASCADE; CREATE TABLE films (name text, kind text, rating int); -\COPY films FROM './films.sql'; -- COPY runs on the server, Hence, the client commanding the server to load a file *must give a full path*, because it doesn't know where the server is. Conversely, \COPY runs on the client, which, if the server is remote, might be a large amount of data. See +\COPY films FROM './tests/films.sql'; +-- COPY vs \COPY: +-- COPY runs on the server, Hence, the client commanding the server to load a file *must give a full path*, +-- because in principle it doesn't know where the server is. +-- Conversely, \COPY runs on the client, which, if the server is remote, might be a large amount of data. See INSERT INTO films VALUES ('Grass', 'Documentary', 0); INSERT INTO films VALUES ('The Mail Man', 'Documentary', 3); diff --git a/src/frontend/cacheset.js b/src/frontend/cacheset.js deleted file mode 100644 index bc462e8..0000000 --- a/src/frontend/cacheset.js +++ /dev/null @@ -1,470 +0,0 @@ -/* cacheset: an in-memory in-javascript database system based around multisets instead of around tables or documents (though any can be translated to the others) and around dataflow programming. - * - * The system provides subtypes (subset, and, or, not, columns, distinct, sum, average, min, max, groupBy, ...). - * with each type maintaining a live cache of its current state, as computed from whatever it is based on (necessarily, then, the supported operations are limited to whatever can be efficiently computed on a stream; limited to roughly what SQL provides, in fact). - - * The main concept here is the caching, which is a lot like postgres's materialized view; but unlike postgres's implementation, this does edge-triggered processing: it reacts to changes pushed from a source instead of having to poll a source to get a complete new copy. - * Since JS is pointers-everywhere, for the types which compute new sets (subset, and, or, not) are storage-cheap: each element only actually exists once; the storage requirement for each type is only sizeof(js_ptr)*cache.length. The other types do not have this guarantee (in particular, Columns has to create new objects) - * - * - * Dependent types listen to their parent's "insert" and "delete" events; when a parent's cache is updated it fires an event, which causes a chain reaction of dependents to check if they should update their cache, and fire their events. - * - * - * Garbage Collection: - * - it is important that dependees hold strong references to their parents and that parents hold (at most) weak references to their children - * e.g. if you say - * var C = new CacheSet([...]); - * var large = C.subset(function(e) { e.width > 9 }); - * var v = large.columns(["name", "type"]).distinct(); - * then you have must have a chain of strong references - * v (a Distinct) -> Columns -> SubSet -> CacheSet - * which is good and right because v ultimately depends on all of those - * but if you - * delete v; (or otherwise lose its reference) - * you do not care about either v or the anonymous Columns instance either, and expect them to be garbage collected; however, because they would have registered event listeners with their parents, their parents are holding a pointer to something that eventually leads to them; these pointers must therefore be weak references or else we will leak memory. - - * TODO: clean up terminology; decide between parents,children and dependees,dependents - */ - - - -// how do I extend the array type? -// do I want to extend the array type? - -// CacheSet has an API similar to but only overlapping with--not inheriting--the Array API -// hence, we wrap an array -// Notably, *this type is not actually a set* but rather a multiset since it allows multiple copies of a single item -// (Use Case: cloning SQL db tables; in practice, all SQL rows should have a unique ID, which automatically, but since SQL allows this not to be true I need to support it not being true) - - -// TODO: support updates; right now, the API forces you to perform updates as a delete foll -// TODO: are there gnarly edge cases where a delete,insert != insert,delete ? perhaps made gnarlier by the identity issues that is() represents? - -//var _ = require('../../assets/libs/underscore.js'); //?? -//var PourOver = require('../../assets/libs/pourover.js'); - -//module.exports = CacheSet - -/* is essentially defines what we consider to be an equal object - * we could also use underscore's _.isEqual() which does recursive value comparison - * or == which has some nasty corner cases that === avoids - */ - // TODO: test identity issues. What happens if we insert three "3"s then remove two of them? What if we do an equivalent test but with objects? what if the objects are literally the same object, and what if they are clones? I use === which performs python's "is" operator but only on two objects; on two primitive types (ints, floats) it does value comparison, and strings seem to exist in the middle: "a" === "a" is true but new String("a") === new String("a") is false ((probably because js does string interning, like python used to do) -// , which might be desirable but also might make problems with double -// ...hm. since my use case plans to eat deletions as json like {-: obj}, obj will *not* be identity-equal with the target -// one solution: switch to _.isEqual(); pro: does what I need; con: slow; potentially unbounded runtime; con: -// another: design the API--like PourOver--to demand .remove() is given objects from itself (so that we can depend on identity-equals). Then, at the layer that speaks to the WebSocket, do the conversion. This way, the _.isEqual() cruft only has to happen at one layer, and the whole dataflow chain from then on can just use === -// is-ness matters for: deletes and Distinct -function is(a,b) { - //return a === b; - return _.isEqual(a,b); //rational for putting this in here: yes, it slows things down, but it allows .delete() to behave - // maybe we should write it that .delete() is special cased to use isEqual -} - -/* helper method which many of the things share - * NB: .findIndex is a gecko extension, which we've imported by a polyfill in libs - * this - */ -function findIndex(self, e) { - return self._cache.findIndex(function(g) { return is(g,e); }); -} - -function add(self, e) { - self._cache.push(e); - //TODO: keep sorted - - self.trigger("insert", e); - self.trigger("rerender"); -} - -function remove(self, e) { - // find an element equal to e; note: there might be more than one! - i = findIndex(self, e); - //i = self._cache.indexOf(e); - //i = self._cache.findIndex(function(g) { return _.isEqual(g, e); }) - //TODO: once the caches are kept sorted, use a binary search instead - - if(i >= 0) { //silly bug: "if(i)" is false if i==0, but i==0 is a valid result from indexOf - self._cache.splice(i, 1); - self.trigger("delete", e); - self.trigger("rerender"); - } -} - - - -/* class Table - * - * A Table implements a MultiSet of objects. It is intentionally very similar to a SQL table. - * - * API: - * - .on("insert", new_row) -- fired AFTER insertion - * - .on("delete", old_row) -- fired AFTER deletion - * - FUTURE: .on("update", old_row, new_row) -- fired AFTER updating - * - .on("rerender") -- fired AFTER inserts, updates, deletes - * - FUTURE: .at, .toArray(), etc(??) - * - CURRENT: use ._cache to access the current state - * - Operators: - * S.and(Q), S.or(Q) - * or And(S, Q, ...), Or(S, Q, ...) - * S.where(pred) - filter the set by a predicate - * S.distinct() - choose only unique items, as compared by underscore's _.isEqqual() - * S.map(f) - convert elements to a new element via a map function - * S.select(fields) - slice out new objects from - * S.scalar(field) - choose only the given field - * - * FUTURE: S.reduce((prev, insert) -> next, (prev, ) - compact the table into a single value - using this reduce is a bit unusual compared to the classic reduce, because you "prev" value needs to be as much data as you need to handle both inserts and deletes; for 'sum' prev is simply the value of the sum (since a delete can be applied with subtraction), but for more complica - * FUTURE: S.sum() - only works on tables of numbers; will throw an exception if it hits - - * - FUTURE: Join(S, Q, fields) - * - * roughly three categories: filters (and, or, not, where, distinct), maps (map, select, scalar), reduces (sum, avg, - * - * TODO: pick a more descriptive and accurate name - * TODO: implement .length - * TODO: implement iterators or element access or something (rather than telling clients to just use ._cache); or, subclass Array and use its built in indexers - */ -function CacheSet(seed) { - // if 'new' was not used - if(! (this instanceof CacheSet)) return new CacheSet(seed); - - if(seed === null) { seed = new Array(); } - this._cache = seed.slice(0); //shallow copy the seed array - -} -_.extend(CacheSet.prototype, PourOver.Events); // TODO: use Backbone.Events instead, since that's the original source -// alternately, roll these ideas into PourOver, though that will be difficult without breaking PourOver, or at least its zen. - - - -CacheSet.prototype.insert = function(e) { - add(this, e); -} - -CacheSet.prototype.delete = function(e) { - remove(this, e); -} - -CacheSet.prototype.subset = function(pred) { - return new SubSet(this, pred); -} - -/* A SubSet is a filtered slice of a CacheSet - * and is itself considered a CacheSet - * Like a SQL View, a SubSet - * but a SubSet is more like a Materialized View (c.f. Postgres; also this hack in MySQL: http://www.fromdual.com/mysql-materialized-views) - // a SubSet S = SubSet(P, pred) by definition is supposed to maintain the invariant that - // S = { e for e in P if pred(e) } - // which equivalently means - // \forall `e`: `e in P` and `pred(e)` => `e in S` -*/ -function SubSet(parent, pred) { - - // if 'new' was not used - if(! (this instanceof SubSet)) return new SubSet(parent, pred); - - //XXX this would be more elegant if it was a subclass of CacheSet, - // but I don't know how to do js inheritance properly - // also, it's not clear if a SubSet should allow direct .insert() and .delete()s - // TODO: implement .subset(), at least - - var self = this; - - self._pred = pred; - self._cache = parent._cache.filter(self._pred); - - - // whoops; .on() runs its callback in the scope of parent. - // bah. dynamic scopppinggggg!!! - parent.on("insert", function(e) { - if(self._pred(e)) { - add(self, e); - } - }); - - parent.on("delete", function(e) { - - // if parent.delete(e) actually completes (thus triggering this) - // then we know `e in P`. - // if we also have `pred(e)` - // then, by the invariant, - // we imply - // e *is* in self - - if(self._pred(e)) { - remove(self, e); - } - }); -} -SubSet.prototype.subset = function(pred) { - return new SubSet(this, pred); -} -_.extend(SubSet.prototype, PourOver.Events); - - -// TODO: write a o_cmp which can be used by Array.sort() to order objects; by default, sort() misbehaves on objects - -function And(A, B) { - // if 'new' was not used - if(! (this instanceof And)) return new And(A, B); - var self = this; - - self._A = A; - self._B = B; - - // Compute (A and B) as [e for e in A if e in B] - //TODO: exploit sorting to make this faster - // as written, this is an O(n^2) step - self._cache = self._A._cache.filter(function(a) { - return self._B._cache.indexOf(a) != -1; - }); - - // now, an incoming item can only come in if it is BOTH in A and in B - // that means we cannot add(self, e) until we have heard A.on("insert", e) and B.on("insert", e) - // so, we maintain a limbo state, of items we're waiting to hear from A and from B about - // this is totally symmetric about A,B (because AND is commutative), so without loss of generality, suppose A.on("insert",e) happens, and suppose there is nothing else but e (since each item is essentially in a slice unto itself) - // then there are four possible next events, as far as this object is concerned: - // A.on("insert", e) --> in this case, e is a duplicate; since we *allow* duplicates, we just act as if e is a new unique item (i.e. this case isn't really a distinct case at all) - // A.on("delete", e) --> remove e from the limbo queue - // B.on("insert", e) --> remove e from the limbo queue and put it into cache - // B.on("delete", e) --> we haven't heard about e yet; ignore - // -> if, before we hear about e from - - // hmm how do I factor this nicely? clever closures, probably... maybe an array (so A_limbo == limbo[0], B_limbo==limbo[1]...; which makes generalizing to n ANDed terms easy) - A_limbo = []; - B_limbo = []; - - self._A.on("insert", function(e) { - // 1) check if e is in B's limbo - if((i = B_limbo.indexOf(e)) != -1) { - B_limbo.splice(i, 1); - add(self, e); - } else { - // 2) we "haven't" seen e yet; queue it - A_limbo.push(e); //TODO: keep sortttted - } - }); - - self._A.on("delete", function(e) { - // three paths: if e is in limbo, eating the limbo copy has priority (XXX does this actually make sense? what if A and B both contain x initially, someone adds an x to A but not B, then deletes it from A.,. so in that case, yes, the - // else, if e is in the cache, take it out, because if just one of A or B fails to have e then A AND B has to fail as well - // if e isn't in in us at all, ignore - - // 1) check if e is in A's limbo - if((i = A_limbo.indexOf(e)) != -1) { - A_limbo.splice(i, 1); - - } else { - // 2) we "haven't" seen e yet; queue it - A_limbo.push(e); //TODO: keep sortttted - remove(self, e); - } - }); - - - self._B.on("insert", function(e) { - // 1) check if e is in B's limbo - if((i = A_limbo.indexOf(e)) != -1) { - A_limbo.splice(i, 1); - add(self, e); - } else { - // 2) we "haven't" seen e yet; queue it - B_limbo.push(e); //TODO: keep sortttted - } - }); - - self._B.on("delete", function(e) { - // three paths: if e is in limbo, eating the limbo copy has priority (XXX does this actually make sense? what if A and B both contain x initially, someone adds an x to A but not B, then deletes it from A.,. so in that case, yes, the - // else, if e is in the cache, take it out, because if just one of A or B fails to have e then A AND B has to fail as well - // if e isn't in in us at all, ignore - - // 1) check if e is in A's limbo - if((i = B_limbo.indexOf(e)) != -1) { - B_limbo.splice(i, 1); - - } else { - // 2) we "haven't" seen e yet; queue it - B_limbo.push(e); //TODO: keep sortttted - remove(self, e); - } - }); -} -_.extend(And.prototype, PourOver.Events); - -// I really need some sort of SortedArray type which has, like, merge() and filter() ops - -function Or(A, B) { - if(! (this instanceof Or)) return new Or(A, B); - var self = this; - - self._A = A; - self._B = B; - - // Compute (A or B) as (A concat B) - (A and B) ((where this set- only removes *one* copy per item)) - //TODO: exploit sorting to make this faster - // as written, this is an O(n^2) step - // in fact, it's *even worse* here than in And(), since not only is there the n^2 And step, then there's a tedious n^2 filtering out step - // this pains me so much - - self._cache = self._A._cache.concat(self._B._cache); - intersection = self._A._cache.filter(function(a) { - return self._B._cache.indexOf(a) != -1; - }); - - //console.debug("naive unioning left", intersection, " duplicated; erasing"); - // remove exactly one copy of each intersection element from the cache - for(i = 0; i - +