1 /** Pipeline composition.
2  *
3  *  Authors: $(LINK2 https://github.com/epi, Adrian Matoga)
4  *  Copyright: © 2016 Adrian Matoga
5  *  License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0).
6  */
7 module flod.pipeline;
8 
9 import std.range : isDynamicArray, isInputRange;
10 import std.typecons : Flag, Yes, No;
11 
12 import flod.meta : NonCopyable, str;
13 import flod.range;
14 import flod.traits;
15 
16 version(unittest) {
17 	import std.algorithm : min, max, map, copy;
18 	import std.conv : to;
19 	import std.experimental.logger : logf, errorf;
20 	import std.range : isInputRange, ElementType, array, take;
21 	import std..string : split, toLower, startsWith, endsWith;
22 
23 	ulong[] inputArray;
24 	ulong[] outputArray;
25 	size_t outputIndex;
26 
27 	uint filterMark(string f) {
28 		f = f.toLower;
29 		uint fm;
30 		if (f.startsWith("pull"))
31 			fm = 1;
32 		else if (f.startsWith("push"))
33 			fm = 2;
34 		else if (f.startsWith("alloc"))
35 			fm = 3;
36 		if (f.endsWith("pull"))
37 			fm |= 1 << 2;
38 		else if (f.endsWith("push"))
39 			fm |= 2 << 2;
40 		else if (f.endsWith("alloc"))
41 			fm |= 3 << 2;
42 		return fm;
43 	}
44 
45 	ulong filter(string f)(ulong a) {
46 		enum fm = filterMark(f);
47 		return (a << 4) | fm;
48 	}
49 
50 	// sources:
51 	struct Arg(alias T) { bool constructed = false; }
52 
53 	mixin template TestStage(N...) {
54 		alias This = typeof(this);
55 		static if (is(This == A!(B, C), alias A, B, C))
56 			alias Stage = A;
57 		else static if (is(This == D!(E), alias D, E))
58 			alias Stage = D;
59 		else static if (is(This == F!G, alias F, alias G))
60 			alias Stage = F;
61 		else static if (is(This))
62 			alias Stage = This;
63 		else
64 			static assert(0, "don't know how to get stage from " ~ This.stringof ~ " (" ~ str!This ~ ")");
65 
66 		@disable this(this);
67 		@disable void opAssign(typeof(this));
68 
69 		// this is to ensure that construct() calls the right constructor for each stage
70 		this(Arg!Stage arg) { this.arg = arg; this.arg.constructed = true; }
71 		Arg!Stage arg;
72 	}
73 
74 	@pullSource!ulong
75 	struct TestPullSource {
76 		mixin TestStage;
77 
78 		size_t pull(ulong[] buf)
79 		{
80 			auto len = min(buf.length, inputArray.length);
81 			buf[0 .. len] = inputArray[0 .. len];
82 			inputArray = inputArray[len .. $];
83 			return len;
84 		}
85 	}
86 
87 	@peekSource!ulong
88 	struct TestPeekSource {
89 		mixin TestStage;
90 
91 		const(ulong)[] peek(size_t n)
92 		{
93 			auto len = min(max(n, 2909), inputArray.length);
94 			return inputArray[0 .. len];
95 		}
96 
97 		void consume(size_t n) { inputArray = inputArray[n .. $]; }
98 	}
99 
100 	@pushSource!ulong
101 	struct TestPushSource(Sink) {
102 		mixin TestStage;
103 		Sink sink;
104 
105 		void run()()
106 		{
107 			while (inputArray.length) {
108 				auto len = min(1337, inputArray.length);
109 				if (sink.push(inputArray[0 .. len]) != len)
110 					break;
111 				inputArray = inputArray[len .. $];
112 			}
113 		}
114 	}
115 
116 	@allocSource!ulong
117 	struct TestAllocSource(Sink) {
118 		mixin TestStage;
119 		Sink sink;
120 
121 		void run()()
122 		{
123 			ulong[] buf;
124 			while (inputArray.length) {
125 				auto len = min(1337, inputArray.length);
126 				if (!sink.alloc(buf, len))
127 					assert(0);
128 				buf[0 .. len] = inputArray[0 .. len];
129 				if (sink.commit(len) != len)
130 					break;
131 				inputArray = inputArray[len .. $];
132 			}
133 		}
134 	}
135 
136 	// sinks:
137 
138 	@pullSink!ulong
139 	struct TestPullSink(Source) {
140 		mixin TestStage;
141 		Source source;
142 
143 		void run()
144 		{
145 			while (outputIndex < outputArray.length) {
146 				auto len = min(4157, outputArray.length - outputIndex);
147 				auto pd = source.pull(outputArray[outputIndex .. outputIndex + len]);
148 				outputIndex += pd;
149 				if (pd < len)
150 					break;
151 			}
152 		}
153 	}
154 
155 	@peekSink!ulong
156 	struct TestPeekSink(Source) {
157 		mixin TestStage;
158 		Source source;
159 
160 		void run()
161 		{
162 			while (outputIndex < outputArray.length) {
163 				auto len = min(4157, outputArray.length - outputIndex);
164 				auto ib = source.peek(len);
165 				auto olen = min(len, ib.length, 6379);
166 				outputArray[outputIndex .. outputIndex + olen] = ib[0 .. olen];
167 				outputIndex += olen;
168 				source.consume(olen);
169 				if (olen < len)
170 					break;
171 			}
172 		}
173 	}
174 
175 	@pushSink!ulong
176 	struct TestPushSink {
177 		mixin TestStage;
178 
179 		size_t push(const(ulong)[] buf)
180 		{
181 			auto len = min(buf.length, outputArray.length - outputIndex);
182 			if (len) {
183 				outputArray[outputIndex .. outputIndex + len] = buf[0 .. len];
184 				outputIndex += len;
185 			}
186 			return len;
187 		}
188 	}
189 
190 	@allocSink!ulong
191 	struct TestAllocSink {
192 		mixin TestStage;
193 		ulong[] last;
194 
195 		bool alloc(ref ulong[] buf, size_t n)
196 		{
197 			if (n < outputArray.length - outputIndex)
198 				buf = outputArray[outputIndex .. outputIndex + n];
199 			else
200 				buf = last = new ulong[n];
201 			return true;
202 		}
203 
204 		size_t commit(size_t n)
205 		out(result) { assert(result <= n); }
206 		body
207 		{
208 			if (!last) {
209 				outputIndex += n;
210 				return n;
211 			} else {
212 				auto len = min(n, outputArray.length - outputIndex);
213 				outputArray[outputIndex .. outputIndex + len] = last[0 .. len];
214 				outputIndex += len;
215 				return len;
216 			}
217 		}
218 	}
219 
220 	// filter
221 
222 	@peekSink!ulong @peekSource!ulong
223 	struct TestPeekFilter(Source) {
224 		mixin TestStage;
225 		Source source;
226 		const(ulong)[] peek(size_t n)
227 		{
228 			return source.peek(n).map!(filter!"peek").array();
229 		}
230 		void consume(size_t n) { source.consume(n); }
231 	}
232 
233 	@peekSink!ulong @pullSource!ulong
234 	struct TestPeekPullFilter(Source) {
235 		mixin TestStage;
236 		Source source;
237 		size_t pull(ulong[] buf)
238 		{
239 			auto ib = source.peek(buf.length);
240 			auto len = min(ib.length, buf.length);
241 			ib.take(len).map!(filter!"peekPull").copy(buf);
242 			source.consume(len);
243 			return len;
244 		}
245 	}
246 
247 	@peekSink!ulong @pushSource!ulong
248 	struct TestPeekPushFilter(Source, Sink) {
249 		mixin TestStage;
250 		Source source;
251 		Sink sink;
252 		void run()()
253 		{
254 			for (;;) {
255 				auto ib = source.peek(4096);
256 				auto ob = ib.map!(filter!"peekPush").array();
257 				source.consume(ib.length);
258 				if (sink.push(ob) < 4096)
259 					break;
260 			}
261 		}
262 	}
263 
264 	@peekSink!ulong @allocSource!ulong
265 	struct TestPeekAllocFilter(Source, Sink) {
266 		mixin TestStage;
267 		Source source;
268 		Sink sink;
269 		void run()()
270 		{
271 			ulong[] buf;
272 			for (;;) {
273 				auto ib = source.peek(4096);
274 				if (!sink.alloc(buf, ib.length))
275 					assert(0);
276 				auto len = min(ib.length, buf.length);
277 				ib.take(len).map!(filter!"peekAlloc").copy(buf);
278 				source.consume(len);
279 				if (sink.commit(len) < 4096)
280 					break;
281 			}
282 		}
283 	}
284 
285 	@pullSink!ulong @pullSource!ulong
286 	struct TestPullFilter(Source) {
287 		mixin TestStage;
288 		Source source;
289 		size_t pull(ulong[] buf)
290 		{
291 			size_t n = source.pull(buf);
292 			foreach (ref b; buf[0 .. n])
293 				b = b.filter!"pull";
294 			return n;
295 		}
296 	}
297 
298 	@pullSink!ulong @peekSource!ulong
299 	struct TestPullPeekFilter(Source) {
300 		mixin TestStage;
301 		Source source;
302 		const(ulong)[] peek(size_t n)
303 		{
304 			auto buf = new ulong[n];
305 			size_t m = source.pull(buf[]);
306 			foreach (ref b; buf[0 .. m])
307 				b = b.filter!"pullPeek";
308 			return buf[0 .. m];
309 		}
310 		void consume(size_t n) {}
311 	}
312 
313 	@pullSink!ulong @pushSource!ulong
314 	struct TestPullPushFilter(Source, Sink) {
315 		mixin TestStage;
316 		Source source;
317 		Sink sink;
318 		void run()()
319 		{
320 			for (;;) {
321 				ulong[4096] buf;
322 				auto n = source.pull(buf[]);
323 				foreach (ref b; buf[0 .. n])
324 					b = b.filter!"pullPush";
325 				if (sink.push(buf[0 .. n]) < 4096)
326 					break;
327 			}
328 		}
329 	}
330 
331 	@pullSink!ulong @allocSource!ulong
332 	struct TestPullAllocFilter(Source, Sink) {
333 		mixin TestStage;
334 		Source source;
335 		Sink sink;
336 		void run()()
337 		{
338 			for (;;) {
339 				ulong[] buf;
340 				if (!sink.alloc(buf, 4096))
341 					assert(0);
342 				auto n = source.pull(buf[]);
343 				foreach (ref b; buf[0 .. n])
344 					b = b.filter!"pullAlloc";
345 				if (sink.commit(n) < 4096)
346 					break;
347 			}
348 		}
349 	}
350 
351 	@pushSink!ulong @pushSource!ulong
352 	struct TestPushFilter(Sink) {
353 		mixin TestStage;
354 		Sink sink;
355 		size_t push(const(ulong)[] buf)
356 		{
357 			return sink.push(buf.map!(filter!"push").array());
358 		}
359 	}
360 
361 	@pushSink!ulong @allocSource!ulong
362 	struct TestPushAllocFilter(Sink) {
363 		mixin TestStage;
364 		Sink sink;
365 		size_t push(const(ulong)[] buf)
366 		out(result) { assert(result <= buf.length); }
367 		body
368 		{
369 			ulong[] ob;
370 			if (!sink.alloc(ob, buf.length))
371 				assert(0);
372 			auto len = min(buf.length, ob.length);
373 			buf.take(len).map!(filter!"pushAlloc").copy(ob);
374 			return sink.commit(len);
375 		}
376 	}
377 
378 	@pushSink!ulong @pullSource!ulong
379 	struct TestPushPullFilter(alias Scheduler) {
380 		mixin Scheduler;
381 		mixin TestStage;
382 		ulong[] buffer;
383 
384 		size_t push(const(ulong)[] buf)
385 		{
386 			buffer ~= buf.map!(filter!"pushPull").array();
387 			if (yield())
388 				return 0;
389 			return buf.length;
390 		}
391 
392 		size_t pull(ulong[] buf)
393 		{
394 			size_t n = buf.length;
395 			while (buffer.length < n) {
396 				if (yield())
397 					break;
398 			}
399 			size_t len = min(n, buffer.length);
400 			buf[0 .. len] = buffer[0 .. len];
401 			buffer = buffer[len .. $];
402 			return len;
403 		}
404 	}
405 
406 	@pushSink!ulong @peekSource!ulong
407 	struct TestPushPeekFilter(alias Scheduler) {
408 		mixin Scheduler;
409 		mixin TestStage;
410 		ulong[] buffer;
411 
412 		size_t push(const(ulong)[] buf)
413 		{
414 			buffer ~= buf.map!(filter!"pushPeek").array();
415 			if (yield())
416 				return 0;
417 			return buf.length;
418 		}
419 
420 		const(ulong)[] peek(size_t n)
421 		{
422 			while (buffer.length < n) {
423 				if (yield())
424 					break;
425 			}
426 			return buffer;
427 		}
428 
429 		void consume(size_t n)
430 		{
431 			buffer = buffer[n .. $];
432 		}
433 	}
434 
435 	@allocSink!ulong @allocSource!ulong
436 	struct TestAllocFilter(Sink) {
437 		mixin TestStage;
438 		Sink sink;
439 		ulong[] buf;
440 
441 		bool alloc(ref ulong[] buf, size_t n)
442 		{
443 			auto r = sink.alloc(buf, n);
444 			this.buf = buf;
445 			return r;
446 		}
447 
448 		size_t commit(size_t n)
449 		{
450 			foreach (ref b; buf[0 .. n])
451 				b = b.filter!"alloc";
452 			return sink.commit(n);
453 		}
454 	}
455 
456 	@allocSink!ulong @pushSource!ulong
457 	struct TestAllocPushFilter(Sink) {
458 		mixin TestStage;
459 		Sink sink;
460 		ulong[] buffer;
461 
462 		bool alloc(ref ulong[] buf, size_t n)
463 		{
464 			buffer = buf = new ulong[n];
465 			return true;
466 		}
467 
468 		size_t commit(size_t n)
469 		{
470 			size_t m = sink.push(buffer[0 .. n].map!(filter!"allocPush").array());
471 			buffer = buffer[m .. $];
472 			return m;
473 		}
474 	}
475 
476 	@allocSink!ulong @pullSource!ulong
477 	struct TestAllocPullFilter(alias Scheduler) {
478 		mixin Scheduler;
479 		mixin TestStage;
480 		ulong[] buffer;
481 		size_t readOffset;
482 		size_t writeOffset;
483 
484 		bool alloc(ref ulong[] buf, size_t n)
485 		{
486 			buffer.length = writeOffset + n;
487 			buf = buffer[writeOffset .. $];
488 			return true;
489 		}
490 
491 		size_t commit(size_t n)
492 		{
493 			foreach (ref b; buffer[writeOffset .. writeOffset + n])
494 				b = b.filter!"allocPull";
495 			writeOffset += n;
496 			if (yield())
497 				return 0;
498 			return n;
499 		}
500 
501 		size_t pull(ulong[] buf)
502 		{
503 			size_t n = buf.length;
504 			while (writeOffset - readOffset < n) {
505 				if (yield())
506 					break;
507 			}
508 			size_t len = min(n, writeOffset - readOffset);
509 			buf[0 .. len] = buffer[readOffset .. readOffset + len];
510 			readOffset += len;
511 			return len;
512 		}
513 	}
514 
515 	@allocSink!ulong @peekSource!ulong
516 	struct TestAllocPeekFilter(alias Scheduler) {
517 		mixin Scheduler;
518 		mixin TestStage;
519 		ulong[] buffer;
520 		size_t readOffset;
521 		size_t writeOffset;
522 
523 		bool alloc(ref ulong[] buf, size_t n)
524 		{
525 			buffer.length = writeOffset + n;
526 			buf = buffer[writeOffset .. $];
527 			return true;
528 		}
529 
530 		size_t commit(size_t n)
531 		{
532 			foreach (ref b; buffer[writeOffset .. writeOffset + n])
533 				b = b.filter!"allocPeek";
534 			writeOffset += n;
535 			if (yield())
536 				return 0;
537 			return n;
538 		}
539 
540 		const(ulong)[] peek(size_t n)
541 		{
542 			while (writeOffset - readOffset < n) {
543 				if (yield())
544 					break;
545 			}
546 			return buffer[readOffset .. writeOffset];
547 		}
548 
549 		void consume(size_t n)
550 		{
551 			readOffset += n;
552 		}
553 	}
554 
555 	string genStage(string filter, string suf)
556 	{
557 		import std.ascii : toUpper;
558 		auto cf = filter[0].toUpper ~ filter[1 .. $];
559 		return "pipe!Test" ~ cf ~ suf ~ "(Arg!Test" ~ cf ~ suf ~ "())";
560 	}
561 
562 	string genChain(string filterList)
563 	{
564 		import std.algorithm : map;
565 		import std.array : join, split;
566 		auto filters = filterList.split(",");
567 		string midstr;
568 		if (filters.length > 2)
569 			midstr = filters[1 .. $ - 1].map!(f => "." ~ genStage(f, "Filter")).join;
570 		return genStage(filters[0], "Source")
571 			~ midstr
572 			~ "." ~ genStage(filters[$ - 1], "Sink") ~ ";";
573 	}
574 
575 	void testChain(string filterlist, R)(R r)
576 		if (isInputRange!R && is(ElementType!R : ulong))
577 	{
578 		auto input = r.map!(a => ulong(a)).array();
579 		logf("Testing %s with %d elements", filterlist, input.length);
580 		auto expectedOutput = input.dup;
581 		auto filters = filterlist.split(",");
582 		if (filters.length > 2) {
583 			foreach (filter; filters[1 .. $ - 1]) {
584 				auto fm = filterMark(filter);
585 				foreach (ref eo; expectedOutput)
586 					eo = (eo << 4) | fm;
587 			}
588 		}
589 		foreach(expectedLength; [ size_t(0), input.length / 3, input.length - 1, input.length,
590 			input.length + 1, input.length * 5 ]) {
591 			outputArray.length = expectedLength;
592 			outputArray[] = 0xbadc0ffee0ddf00d;
593 			inputArray = input;
594 			outputIndex = 0;
595 			mixin(genChain(filterlist));
596 			auto len = min(outputIndex, expectedLength, input.length);
597 			uint left = 8;
598 			size_t all = 0;
599 			if (outputIndex != min(expectedLength, input.length)) {
600 				errorf("Output length is %d, expected %d", outputIndex, min(expectedLength, input.length));
601 				assert(0);
602 			}
603 			for (size_t i = 0; i < len; i++) {
604 				if (expectedOutput[i] != outputArray[i]) {
605 					if (left > 0) {
606 						logf("expected[%d] != output[%d]: %x vs. %x", i, i, expectedOutput[i], outputArray[i]);
607 						--left;
608 					}
609 					all++;
610 				}
611 			}
612 			if (all > 0) {
613 				logf("%s", genChain(filterlist));
614 				logf("total: %d differences", all);
615 			}
616 			assert(all == 0);
617 		}
618 	}
619 
620 	void testChain(string filterlist)()
621 	{
622 		import std.range : iota;
623 		testChain!filterlist(iota(0, 173447));
624 	}
625 
626 }
627 
628 struct SinkDrivenFiberScheduler {
629 	import core.thread : Fiber;
630 	Fiber fiber;
631 	mixin NonCopyable;
632 
633 	void stop()
634 	{
635 		auto f = this.fiber;
636 		if (f) {
637 			if (f.state == Fiber.State.HOLD) {
638 				this.fiber = null;
639 				f.call();
640 			}
641 			auto x = f.state;
642 			assert(f.state == Fiber.State.TERM);
643 		}
644 	}
645 
646 	int yield()
647 	{
648 		if (fiber is null)
649 			return 2;
650 		if (fiber.state == Fiber.State.EXEC) {
651 			Fiber.yield();
652 			return fiber is null;
653 		} else {
654 			if (fiber.state == Fiber.State.HOLD)
655 				fiber.call();
656 			return fiber.state != Fiber.State.HOLD;
657 		}
658 	}
659 }
660 
661 mixin template FiberScheduler() {
662 	import flod.pipeline;
663 	SinkDrivenFiberScheduler _flod_scheduler;
664 
665 	int yield() { return _flod_scheduler.yield(); }
666 	void spawn(void delegate() dg)
667 	{
668 		import core.thread : Fiber;
669 		if (!_flod_scheduler.fiber)
670 			_flod_scheduler.fiber = new Fiber(dg, 65536);
671 	}
672 	void stop() { _flod_scheduler.stop(); }
673 }
674 
675 // forwards all calls to its impl
676 private struct Forward(S, Flag!"readFromSink" readFromSink = No.readFromSink) {
677 	enum name = .str!S;
678 	S _impl;
679 	this(Args...)(auto ref Args args)
680 	{
681 		static if (Args.length > 0)
682 			_impl.__ctor(args);
683 	}
684 	~this()
685 	{
686 		debug(FlodTraceLifetime) {
687 			import std.experimental.logger : tracef;
688 			tracef("Destroy %s", name);
689 		}
690 	}
691 	@property ref auto sink()() { return _impl.sink; }
692 	@property ref auto source()() { return _impl.source; }
693 	static if (readFromSink) {
694 		// active source needs to pass pull calls to any push-pull filter at the end of
695 		// the active source chain, so that an inverter wrapping the whole chain can recover
696 		// data sunk into the filter.
697 		auto peek()(size_t n) { return _impl.sink.peek(n); }
698 		void consume()(size_t n) { _impl.sink.consume(n); }
699 		auto pull(T)(T[] buf) { return _impl.sink.pull(buf); }
700 		void spawn()(void delegate() dg) { return _impl.sink.spawn(dg); }
701 		void stop()() { return _impl.sink.stop(); }
702 	} else {
703 		auto peek()(size_t n) { return _impl.peek(n); }
704 		void consume()(size_t n) { _impl.consume(n); }
705 		auto pull(T)(T[] buf) { return _impl.pull(buf); }
706 		void spawn()(void delegate() dg) { return _impl.spawn(dg); }
707 		void stop()() { return _impl.stop(); }
708 	}
709 	auto run()() { return _impl.run(); }
710 	auto step(A...)(auto ref A a) { return _impl.step(a); }
711 	auto alloc(T)(ref T[] buf, size_t n) { return _impl.alloc(buf, n); }
712 	auto commit()(size_t n) { return _impl.commit(n); }
713 	auto push(T)(const(T)[] buf) { return _impl.push(buf); }
714 }
715 
716 private template Inverter(alias method, T) {
717 	@method
718 	struct Inverter(Source) {
719 		import core.thread : Fiber;
720 
721 		Source source;
722 
723 		~this()
724 		{
725 			source.sink.stop();
726 		}
727 
728 		void run()
729 		{
730 			source.run();
731 		}
732 
733 		size_t pull()(T[] buf)
734 		{
735 			source.sink.spawn(&run);
736 			return source.sink.pull(buf);
737 		}
738 
739 		const(T)[] peek()(size_t n)
740 		{
741 			source.sink.spawn(&run);
742 			return source.sink.peek(n);
743 		}
744 
745 		void consume()(size_t n) { source.sink.consume(n); }
746 	}
747 }
748 
749 private void constructInPlace(T, Args...)(ref T t, auto ref Args args)
750 {
751 	debug(FlodTraceLifetime) {
752 		import std.experimental.logger : tracef;
753 		tracef("Construct at %x..%x %s with %s", &t, &t + 1, T.name, Args.stringof);
754 	}
755 	static if (__traits(hasMember, t, "__ctor")) {
756 		t.__ctor(args);
757 	} else static if (Args.length > 0) {
758 		static assert(0, "Stage " ~ str!T ~ " does not have a non-trivial constructor" ~
759 			" but construction was requested with arguments " ~ Args.stringof);
760 	}
761 }
762 
763 private struct Pipeline(alias S, SoP, SiP, A...) {
764 	alias Stage = S;
765 	alias Args = A;
766 	alias SourcePipeline = SoP;
767 	alias SinkPipeline = SiP;
768 
769 	enum bool hasSource = !is(SourcePipeline == typeof(null));
770 	enum bool hasSink   = !is(SinkPipeline == typeof(null));
771 
772 	static if (hasSource) {
773 		alias FirstStage = SourcePipeline.FirstStage;
774 		enum sourcePipeStr = SourcePipeline.pipeStr ~ "->";
775 		enum sourceTreeStr(int indent) = SourcePipeline.treeStr!(indent + 1) ~ "\n";
776 		enum sourceStr = SourcePipeline.str ~ ".";
777 	} else {
778 		alias FirstStage = Stage;
779 		enum sourcePipeStr = "";
780 		enum sourceTreeStr(int indent) = "";
781 		enum sourceStr = "";
782 	}
783 
784 	static if (hasSink) {
785 		alias LastStage = SinkPipeline.LastStage;
786 		enum sinkPipeStr = "->" ~ SinkPipeline.pipeStr;
787 		enum sinkTreeStr(int indent) = "\n" ~ SinkPipeline.treeStr!(indent + 1);
788 		enum sinkStr = "." ~ SinkPipeline.str;
789 	} else {
790 		alias LastStage = Stage;
791 		enum sinkPipeStr = "";
792 		enum sinkTreeStr(int indent) = "";
793 		enum sinkStr = "";
794 	}
795 
796 	static if (is(Traits!LastStage.SourceElementType W))
797 		alias ElementType = W;
798 
799 	enum pipeStr = sourcePipeStr ~ .str!S ~ sinkPipeStr;
800 	enum treeStr(int indent) = sourceTreeStr!indent
801 		~ "|" ~ repeat!(indent, "-") ~ "-" ~ .str!S
802 		~ sinkTreeStr!indent;
803 	enum str = "(" ~ sourceStr ~ .str!Stage ~ sinkStr ~ ")";
804 
805 	static if (hasSink && is(SinkPipeline.Type T))
806 		alias SinkType = T;
807 	static if (hasSource && is(SourcePipeline.Type U))
808 		alias SourceType = U;
809 
810 	static if (isActiveSource!Stage && isActiveSink!Stage && is(SinkType) && is(SourceType))
811 		alias Type = Forward!(Stage!(SourceType, SinkType));
812 	else static if (isActiveSource!Stage && !isActiveSink!Stage && is(SinkType))
813 		alias Type = Forward!(Stage!SinkType, Yes.readFromSink);
814 	else static if (!isActiveSource!Stage && isActiveSink!Stage && is(SourceType))
815 		alias Type = Forward!(Stage!SourceType);
816 	else static if (isPassiveSink!Stage && isPassiveSource!Stage && is(Stage!FiberScheduler SF))
817 		alias Type = Forward!SF;
818 	else static if (is(Stage))
819 		alias Type = Forward!Stage;
820 	else static if (isPassiveSource!Stage && !isSink!Stage && is(SourceType)) // inverter
821 		alias Type = Forward!(Stage!SourceType);
822 
823 	SourcePipeline sourcePipeline;
824 	SinkPipeline sinkPipeline;
825 	Args args;
826 
827 	auto pipe(alias NextStage, NextArgs...)(auto ref NextArgs nextArgs)
828 	{
829 		alias SourceE = Traits!LastStage.SourceElementType;
830 		alias SinkE = Traits!NextStage.SinkElementType;
831 		static assert(is(SourceE == SinkE), "Incompatible element types: " ~
832 			.str!LastStage ~ " produces " ~ SourceE.stringof ~ ", while " ~
833 			.str!NextStage ~ " expects " ~ SinkE.stringof);
834 
835 		static if (areCompatible!(LastStage, NextStage)) {
836 			static if (isPassiveSource!Stage) {
837 				auto result = pipeline!NextStage(this, null, nextArgs);
838 			} else {
839 				static assert(isActiveSource!Stage);
840 				static if (isPassiveSink!NextStage && isPassiveSource!NextStage) {
841 					static if (isPassiveSink!Stage) {
842 						static assert(!hasSource);
843 						static if (hasSink) {
844 							auto result = pipeline!Stage(null, sinkPipeline.pipe!NextStage(nextArgs), args);
845 						} else {
846 							auto result = pipeline!Stage(null, pipeline!NextStage(null, null, nextArgs), args);
847 						}
848 					} else {
849 						static if (hasSink) {
850 							auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))(
851 								pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args),
852 								null);
853 						} else {
854 							auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))(
855 								pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args),
856 								null);
857 						}
858 					}
859 				} else {
860 					static if (hasSink)
861 						auto result = pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args);
862 					else
863 						auto result = pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args);
864 				}
865 			}
866 			static if (isSource!NextStage || isSink!FirstStage)
867 				return result;
868 			else
869 				result.run();
870 		} else {
871 			import std..string : capitalize;
872 			import flod.adapter;
873 			enum adapterName = Traits!LastStage.sourceMethodStr ~ Traits!NextStage.sinkMethodStr.capitalize();
874 			mixin(`return this.` ~ adapterName ~ `.pipe!NextStage(nextArgs);`);
875 		}
876 	}
877 
878 	static if (is(Type)) {
879 		void construct()(ref Type t)
880 		{
881 			static if (hasSource)
882 				sourcePipeline.construct(t.source);
883 			static if (hasSink)
884 				sinkPipeline.construct(t.sink);
885 			constructInPlace(t, args);
886 		}
887 	}
888 
889 	static if (!isSink!FirstStage && !isSource!LastStage) {
890 		void run()()
891 		{
892 			Type t;
893 			construct(t);
894 			t.run();
895 		}
896 	}
897 
898 	static if (!isSink!FirstStage && !isActiveSource!LastStage) {
899 		auto create()()
900 		{
901 			Type t;
902 			construct(t);
903 			return t;
904 		}
905 	}
906 }
907 
908 private auto pipeline(alias Stage, SoP, SiP, A...)(auto ref SoP sourcePipeline, auto ref SiP sinkPipeline, auto ref A args)
909 {
910 	return Pipeline!(Stage, SoP, SiP, A)(sourcePipeline, sinkPipeline, args);
911 }
912 
913 private template testPipeline(P, alias test) {
914 	static if (is(P == Pipeline!A, A...))
915 		enum testPipeline = test!(P.LastStage);
916 	else
917 		enum testPipeline = false;
918 }
919 
920 enum isPeekPipeline(P) = isDynamicArray!P || testPipeline!(P, isPeekSource);
921 
922 enum isPullPipeline(P) = isInputRange!P || testPipeline!(P, isPullSource);
923 
924 enum isPushPipeline(P) = testPipeline!(P, isPushSource);
925 
926 enum isAllocPipeline(P) = testPipeline!(P, isAllocSource);
927 
928 enum isPipeline(P) = isPushPipeline!P || isPullPipeline!P || isPeekPipeline!P || isAllocPipeline!P;
929 
930 ///
931 auto pipe(alias Stage, Args...)(auto ref Args args)
932 	if (isSink!Stage || isSource!Stage)
933 {
934 	static if (isSink!Stage && Args.length > 0 && isDynamicArray!(Args[0]))
935 		return pipeFromArray(args[0]).pipe!Stage(args[1 .. $]);
936 	else static if (isSink!Stage && Args.length > 0 && isInputRange!(Args[0]))
937 		return pipeFromInputRange(args[0]).pipe!Stage(args[1 .. $]);
938 	else
939 		return pipeline!Stage(null, null, args);
940 }
941 
942 unittest {
943 	auto p1 = pipe!TestPeekSource(Arg!TestPeekSource());
944 	static assert(isPeekPipeline!(typeof(p1)));
945 	static assert(is(p1.ElementType == ulong));
946 	auto p2 = pipe!TestPullSource(Arg!TestPullSource());
947 	static assert(isPullPipeline!(typeof(p2)));
948 	static assert(is(p2.ElementType == ulong));
949 }
950 
951 unittest {
952 	auto p1 = pipe!TestPushSource(Arg!TestPushSource());
953 	static assert(isPushPipeline!(typeof(p1)));
954 	auto p2 = pipe!TestAllocSource(Arg!TestAllocSource());
955 	static assert(isAllocPipeline!(typeof(p2)));
956 }
957 
958 unittest {
959 	// compatible source-sink pairs
960 	testChain!`peek,peek`;
961 	testChain!`pull,pull`;
962 	testChain!`push,push`;
963 	testChain!`alloc,alloc`;
964 }
965 
966 unittest {
967 	// compatible, with 1 filter
968 	testChain!`peek,peek,peek`;
969 	testChain!`peek,peekPull,pull`;
970 	testChain!`peek,peekPush,push`;
971 	testChain!`peek,peekAlloc,alloc`;
972 	testChain!`pull,pullPeek,peek`;
973 	testChain!`pull,pull,pull`;
974 	testChain!`pull,pullPush,push`;
975 	testChain!`pull,pullAlloc,alloc`;
976 	testChain!`push,pushPeek,peek`;
977 	testChain!`push,pushPull,pull`;
978 	testChain!`push,push,push`;
979 	testChain!`push,pushAlloc,alloc`;
980 	testChain!`alloc,allocPeek,peek`;
981 	testChain!`alloc,allocPull,pull`;
982 	testChain!`alloc,allocPush,push`;
983 	testChain!`alloc,alloc,alloc`;
984 }
985 
986 unittest {
987 	// just one active sink at the end
988 	testChain!`peek,peek,peek,peek,peek`;
989 	testChain!`peek,peek,peekPull,pull,pull`;
990 	testChain!`pull,pull,pull,pull,pull`;
991 	testChain!`pull,pull,pullPeek,peek,peek`;
992 }
993 
994 unittest {
995 	// just one active source at the beginning
996 	testChain!`push,push,push,push,push`;
997 	testChain!`push,push,pushAlloc,alloc,alloc`;
998 	testChain!`alloc,alloc,alloc,alloc,alloc`;
999 	testChain!`alloc,alloc,allocPush,push,push`;
1000 }
1001 
1002 unittest {
1003 	// convert passive source to active source, longer chains
1004 	testChain!`pull,pullPeek,peekAlloc,allocPush,push`;
1005 	testChain!`pull,pullPeek,peekPush,pushAlloc,alloc`;
1006 	testChain!`peek,peekPull,pullPush,pushAlloc,alloc`;
1007 	testChain!`peek,peekPull,pullAlloc,allocPush,push`;
1008 }
1009 
1010 unittest {
1011 	// convert active source to passive source at stage 2, longer passive source chain
1012 	testChain!`push,pushPull,pull,pull,pullPeek,peek,peekPush,push,push`;
1013 }
1014 
1015 unittest {
1016 	// convert active source to passive source at stage >2 (longer active source chain)
1017 	testChain!`push,push,pushPull,pull`;
1018 	testChain!`push,push,push,push,push,pushPull,pull`;
1019 	testChain!`push,push,pushAlloc,alloc,alloc,allocPeek,peek`;
1020 }
1021 
1022 unittest {
1023 	// multiple inverters
1024 	testChain!`alloc,allocPeek,peekPush,pushPull,pull`;
1025 	testChain!`alloc,alloc,alloc,allocPeek,peek,peekPush,push,pushPull,pull`;
1026 	testChain!`alloc,alloc,allocPeek,peekPush,pushPull,pull`;
1027 	testChain!`alloc,alloc,alloc,allocPeek,peekPush,pushPull,pullPush,push,pushAlloc,alloc,allocPush,pushPeek,peekAlloc,allocPull,pull`;
1028 }
1029 
1030 unittest {
1031 	// implicit adapters, pull->push
1032 	testChain!`pull,push`;
1033 	testChain!`pull,push,push`;
1034 	testChain!`pull,pushPeek,peek`;
1035 	testChain!`pull,pushPull,pull`;
1036 	testChain!`pull,pushAlloc,alloc`;
1037 }
1038 
1039 unittest {
1040 	// implicit adapters, pull->peek
1041 	testChain!`pull,peek`;
1042 	testChain!`pull,peekPush,push`;
1043 	testChain!`pull,peek,peek`;
1044 	testChain!`pull,peekPull,pull`;
1045 	testChain!`pull,peekAlloc,alloc`;
1046 }
1047 
1048 unittest {
1049 	// implicit adapters, pull->alloc
1050 	testChain!`pull,alloc`;
1051 	testChain!`pull,allocPush,push`;
1052 	testChain!`pull,allocPeek,peek`;
1053 	testChain!`pull,allocPull,pull`;
1054 	testChain!`pull,alloc,alloc`;
1055 }
1056 
1057 unittest {
1058 	// implicit adapters, push->pull
1059 	testChain!`push,pull`;
1060 	testChain!`push,pullPush,push`;
1061 	testChain!`push,pullAlloc,alloc`;
1062 	testChain!`push,pullPeek,peek`;
1063 	testChain!`push,pull,pull`;
1064 }
1065 
1066 unittest {
1067 	// implicit adapters, push->peek
1068 	testChain!`push,peek`;
1069 	testChain!`push,peekPush,push`;
1070 	testChain!`push,peekAlloc,alloc`;
1071 	testChain!`push,peek,peek`;
1072 	testChain!`push,peekPull,pull`;
1073 }
1074 
1075 unittest {
1076 	// implicit adapters, push->alloc
1077 	testChain!`push,alloc`;
1078 	testChain!`push,allocPush,push`;
1079 	testChain!`push,allocPeek,peek`;
1080 	testChain!`push,allocPull,pull`;
1081 	testChain!`push,alloc,alloc`;
1082 }
1083 
1084 unittest {
1085 	// implicit adapters, peek->pull
1086 	testChain!`peek,pull`;
1087 	testChain!`peek,pullPush,push`;
1088 	testChain!`peek,pullAlloc,alloc`;
1089 	testChain!`peek,pullPeek,peek`;
1090 	testChain!`peek,pull,pull`;
1091 }
1092 
1093 unittest {
1094 	// implicit adapters, peek->push
1095 	testChain!`peek,push`;
1096 	testChain!`peek,push,push`;
1097 	testChain!`peek,pushAlloc,alloc`;
1098 	testChain!`peek,pushPeek,peek`;
1099 	testChain!`peek,pushPull,pull`;
1100 }
1101 
1102 unittest {
1103 	// implicit adapters, peek->alloc
1104 	testChain!`peek,alloc`;
1105 	testChain!`peek,allocPush,push`;
1106 	testChain!`peek,allocPeek,peek`;
1107 	testChain!`peek,allocPull,pull`;
1108 	testChain!`peek,alloc,alloc`;
1109 }
1110 
1111 unittest {
1112 	// implicit adapters, alloc->peek
1113 	testChain!`alloc,peek`;
1114 	testChain!`alloc,peekPush,push`;
1115 	testChain!`alloc,peekAlloc,alloc`;
1116 	testChain!`alloc,peek,peek`;
1117 	testChain!`alloc,peekPull,pull`;
1118 }
1119 
1120 unittest {
1121 	// implicit adapters, alloc->pull
1122 	testChain!`alloc,pull`;
1123 	testChain!`alloc,pullPush,push`;
1124 	testChain!`alloc,pullAlloc,alloc`;
1125 	testChain!`alloc,pullPeek,peek`;
1126 	testChain!`alloc,pull,pull`;
1127 }
1128 
1129 unittest {
1130 	// implicit adapters, alloc->push
1131 	testChain!`alloc,push`;
1132 	testChain!`alloc,push,push`;
1133 	testChain!`alloc,pushAlloc,alloc`;
1134 	testChain!`alloc,pushPeek,peek`;
1135 	testChain!`alloc,pushPull,pull`;
1136 }
1137 
1138 unittest {
1139 	// implicit adapters, all in one pipeline
1140 	testChain!`alloc,push,peek,pull,alloc,peek,push,pull,peek,alloc,pull,push,peek`;
1141 }
1142 
1143 unittest {
1144 	auto array = [ 1UL, 0xdead, 6 ];
1145 	assert(isPeekPipeline!(typeof(array)));
1146 	outputArray.length = 4;
1147 	outputIndex = 0;
1148 	array.pipe!TestPeekSink();
1149 	assert(outputArray[0 .. outputIndex] == array[]);
1150 }
1151 
1152 unittest {
1153 	import std.range : iota, array, take;
1154 	import std.algorithm : equal;
1155 	auto r = iota(37UL, 1337);
1156 	static assert(isPullPipeline!(typeof(r)));
1157 	outputArray.length = 5000;
1158 	outputIndex = 0;
1159 	r.pipe!TestPullSink();
1160 	assert(outputArray[0 .. outputIndex] == iota(37, 1337).array());
1161 	r = iota(55UL, 1555);
1162 	outputArray.length = 20;
1163 	outputIndex = 0;
1164 	r.pipe!TestPullSink();
1165 	assert(outputArray[0 .. outputIndex] == iota(55, 1555).take(20).array());
1166 }