parallel.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import functools
  2. from toolz.itertoolz import partition_all
  3. from toolz.utils import no_default
  4. def _reduce(func, seq, initial=None):
  5. if initial is None:
  6. return functools.reduce(func, seq)
  7. else:
  8. return functools.reduce(func, seq, initial)
  9. def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
  10. """
  11. Reduce without guarantee of ordered reduction.
  12. Parameters
  13. ----------
  14. binops
  15. Associative operator. The associative property allows us to
  16. leverage a parallel map to perform reductions in parallel.
  17. inputs:
  18. ``binop`` - associative operator. The associative property allows us to
  19. leverage a parallel map to perform reductions in parallel.
  20. ``seq`` - a sequence to be aggregated
  21. ``default`` - an identity element like 0 for ``add`` or 1 for mul
  22. ``map`` - an implementation of ``map``. This may be parallel and
  23. determines how work is distributed.
  24. ``chunksize`` - Number of elements of ``seq`` that should be handled
  25. within a single function call
  26. ``combine`` - Binary operator to combine two intermediate results.
  27. If ``binop`` is of type (total, item) -> total
  28. then ``combine`` is of type (total, total) -> total
  29. Defaults to ``binop`` for common case of operators like add
  30. Fold chunks up the collection into blocks of size ``chunksize`` and then
  31. feeds each of these to calls to ``reduce``. This work is distributed
  32. with a call to ``map``, gathered back and then refolded to finish the
  33. computation. In this way ``fold`` specifies only how to chunk up data but
  34. leaves the distribution of this work to an externally provided ``map``
  35. function. This function can be sequential or rely on multithreading,
  36. multiprocessing, or even distributed solutions.
  37. If ``map`` intends to serialize functions it should be prepared to accept
  38. and serialize lambdas. Note that the standard ``pickle`` module fails
  39. here.
  40. Example
  41. -------
  42. >>> # Provide a parallel map to accomplish a parallel sum
  43. >>> from operator import add
  44. >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
  45. 10
  46. """
  47. assert chunksize > 1
  48. if combine is None:
  49. combine = binop
  50. chunks = partition_all(chunksize, seq)
  51. # Evaluate sequence in chunks via map
  52. if default == no_default:
  53. results = map(
  54. functools.partial(_reduce, binop),
  55. chunks)
  56. else:
  57. results = map(
  58. functools.partial(_reduce, binop, initial=default),
  59. chunks)
  60. results = list(results) # TODO: Support complete laziness
  61. if len(results) == 1: # Return completed result
  62. return results[0]
  63. else: # Recurse to reaggregate intermediate results
  64. return fold(combine, results, map=map, chunksize=chunksize)