1 /** Pipeline composition.
2  *
3  *  Authors: $(LINK2 https://github.com/epi, Adrian Matoga)
4  *  Copyright: © 2016 Adrian Matoga
5  *  License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0).
6  */
7 module flod.pipeline;
8 
9 public import std.typecons : Flag, Yes, No;
10 import flod.traits;
11 import flod.meta : NonCopyable, str;
12 
13 version(unittest) {
14 	import std.algorithm : min, max, map, copy;
15 	import std.conv : to;
16 	import std.experimental.logger : logf, errorf;
17 	import std.range : isInputRange, ElementType, array, take;
18 	import std.string : split, toLower, startsWith, endsWith;
19 
20 	ulong[] inputArray;
21 	ulong[] outputArray;
22 	size_t outputIndex;
23 
24 	uint filterMark(string f) {
25 		f = f.toLower;
26 		uint fm;
27 		if (f.startsWith("pull"))
28 			fm = 1;
29 		else if (f.startsWith("push"))
30 			fm = 2;
31 		else if (f.startsWith("alloc"))
32 			fm = 3;
33 		if (f.endsWith("pull"))
34 			fm |= 1 << 2;
35 		else if (f.endsWith("push"))
36 			fm |= 2 << 2;
37 		else if (f.endsWith("alloc"))
38 			fm |= 3 << 2;
39 		return fm;
40 	}
41 
42 	ulong filter(string f)(ulong a) {
43 		enum fm = filterMark(f);
44 		return (a << 4) | fm;
45 	}
46 
47 	// sources:
48 	struct Arg(alias T) { bool constructed = false; }
49 
50 	mixin template TestStage(N...) {
51 		alias This = typeof(this);
52 		static if (is(This == A!(B, C), alias A, B, C))
53 			alias Stage = A;
54 		else static if (is(This == D!(E), alias D, E))
55 			alias Stage = D;
56 		else static if (is(This == F!G, alias F, alias G))
57 			alias Stage = F;
58 		else static if (is(This))
59 			alias Stage = This;
60 		else
61 			static assert(0, "don't know how to get stage from " ~ This.stringof ~ " (" ~ str!This ~ ")");
62 
63 		@disable this(this);
64 		@disable void opAssign(typeof(this));
65 
66 		// this is to ensure that construct() calls the right constructor for each stage
67 		this(Arg!Stage arg) { this.arg = arg; this.arg.constructed = true; }
68 		Arg!Stage arg;
69 	}
70 
71 	@pullSource!ulong
72 	struct TestPullSource {
73 		mixin TestStage;
74 
75 		size_t pull(ulong[] buf)
76 		{
77 			auto len = min(buf.length, inputArray.length);
78 			buf[0 .. len] = inputArray[0 .. len];
79 			inputArray = inputArray[len .. $];
80 			return len;
81 		}
82 	}
83 
84 	@peekSource!ulong
85 	struct TestPeekSource {
86 		mixin TestStage;
87 
88 		const(ulong)[] peek(size_t n)
89 		{
90 			auto len = min(max(n, 2909), inputArray.length);
91 			return inputArray[0 .. len];
92 		}
93 
94 		void consume(size_t n) { inputArray = inputArray[n .. $]; }
95 	}
96 
97 	@pushSource!ulong
98 	struct TestPushSource(Sink) {
99 		mixin TestStage;
100 		Sink sink;
101 
102 		void run()()
103 		{
104 			while (inputArray.length) {
105 				auto len = min(1337, inputArray.length);
106 				if (sink.push(inputArray[0 .. len]) != len)
107 					break;
108 				inputArray = inputArray[len .. $];
109 			}
110 		}
111 	}
112 
113 	@allocSource!ulong
114 	struct TestAllocSource(Sink) {
115 		mixin TestStage;
116 		Sink sink;
117 
118 		void run()()
119 		{
120 			ulong[] buf;
121 			while (inputArray.length) {
122 				auto len = min(1337, inputArray.length);
123 				if (!sink.alloc(buf, len))
124 					assert(0);
125 				buf[0 .. len] = inputArray[0 .. len];
126 				if (sink.commit(len) != len)
127 					break;
128 				inputArray = inputArray[len .. $];
129 			}
130 		}
131 	}
132 
133 	// sinks:
134 
135 	@pullSink!ulong
136 	struct TestPullSink(Source) {
137 		mixin TestStage;
138 		Source source;
139 
140 		void run()
141 		{
142 			while (outputIndex < outputArray.length) {
143 				auto len = min(4157, outputArray.length - outputIndex);
144 				auto pd = source.pull(outputArray[outputIndex .. outputIndex + len]);
145 				outputIndex += pd;
146 				if (pd < len)
147 					break;
148 			}
149 		}
150 	}
151 
152 	@peekSink!ulong
153 	struct TestPeekSink(Source) {
154 		mixin TestStage;
155 		Source source;
156 
157 		void run()
158 		{
159 			while (outputIndex < outputArray.length) {
160 				auto len = min(4157, outputArray.length - outputIndex);
161 				auto ib = source.peek(len);
162 				auto olen = min(len, ib.length, 6379);
163 				outputArray[outputIndex .. outputIndex + olen] = ib[0 .. olen];
164 				outputIndex += olen;
165 				source.consume(olen);
166 				if (olen < len)
167 					break;
168 			}
169 		}
170 	}
171 
172 	@pushSink!ulong
173 	struct TestPushSink {
174 		mixin TestStage;
175 
176 		size_t push(const(ulong)[] buf)
177 		{
178 			auto len = min(buf.length, outputArray.length - outputIndex);
179 			if (len) {
180 				outputArray[outputIndex .. outputIndex + len] = buf[0 .. len];
181 				outputIndex += len;
182 			}
183 			return len;
184 		}
185 	}
186 
187 	@allocSink!ulong
188 	struct TestAllocSink {
189 		mixin TestStage;
190 		ulong[] last;
191 
192 		bool alloc(ref ulong[] buf, size_t n)
193 		{
194 			if (n < outputArray.length - outputIndex)
195 				buf = outputArray[outputIndex .. outputIndex + n];
196 			else
197 				buf = last = new ulong[n];
198 			return true;
199 		}
200 
201 		size_t commit(size_t n)
202 		out(result) { assert(result <= n); }
203 		body
204 		{
205 			if (!last) {
206 				outputIndex += n;
207 				return n;
208 			} else {
209 				auto len = min(n, outputArray.length - outputIndex);
210 				outputArray[outputIndex .. outputIndex + len] = last[0 .. len];
211 				outputIndex += len;
212 				return len;
213 			}
214 		}
215 	}
216 
217 	// filter
218 
219 	@peekSink!ulong @peekSource!ulong
220 	struct TestPeekFilter(Source) {
221 		mixin TestStage;
222 		Source source;
223 		const(ulong)[] peek(size_t n)
224 		{
225 			return source.peek(n).map!(filter!"peek").array();
226 		}
227 		void consume(size_t n) { source.consume(n); }
228 	}
229 
230 	@peekSink!ulong @pullSource!ulong
231 	struct TestPeekPullFilter(Source) {
232 		mixin TestStage;
233 		Source source;
234 		size_t pull(ulong[] buf)
235 		{
236 			auto ib = source.peek(buf.length);
237 			auto len = min(ib.length, buf.length);
238 			ib.take(len).map!(filter!"peekPull").copy(buf);
239 			source.consume(len);
240 			return len;
241 		}
242 	}
243 
244 	@peekSink!ulong @pushSource!ulong
245 	struct TestPeekPushFilter(Source, Sink) {
246 		mixin TestStage;
247 		Source source;
248 		Sink sink;
249 		void run()()
250 		{
251 			for (;;) {
252 				auto ib = source.peek(4096);
253 				auto ob = ib.map!(filter!"peekPush").array();
254 				source.consume(ib.length);
255 				if (sink.push(ob) < 4096)
256 					break;
257 			}
258 		}
259 	}
260 
261 	@peekSink!ulong @allocSource!ulong
262 	struct TestPeekAllocFilter(Source, Sink) {
263 		mixin TestStage;
264 		Source source;
265 		Sink sink;
266 		void run()()
267 		{
268 			ulong[] buf;
269 			for (;;) {
270 				auto ib = source.peek(4096);
271 				if (!sink.alloc(buf, ib.length))
272 					assert(0);
273 				auto len = min(ib.length, buf.length);
274 				ib.take(len).map!(filter!"peekAlloc").copy(buf);
275 				source.consume(len);
276 				if (sink.commit(len) < 4096)
277 					break;
278 			}
279 		}
280 	}
281 
282 	@pullSink!ulong @pullSource!ulong
283 	struct TestPullFilter(Source) {
284 		mixin TestStage;
285 		Source source;
286 		size_t pull(ulong[] buf)
287 		{
288 			size_t n = source.pull(buf);
289 			foreach (ref b; buf[0 .. n])
290 				b = b.filter!"pull";
291 			return n;
292 		}
293 	}
294 
295 	@pullSink!ulong @peekSource!ulong
296 	struct TestPullPeekFilter(Source) {
297 		mixin TestStage;
298 		Source source;
299 		const(ulong)[] peek(size_t n)
300 		{
301 			auto buf = new ulong[n];
302 			size_t m = source.pull(buf[]);
303 			foreach (ref b; buf[0 .. m])
304 				b = b.filter!"pullPeek";
305 			return buf[0 .. m];
306 		}
307 		void consume(size_t n) {}
308 	}
309 
310 	@pullSink!ulong @pushSource!ulong
311 	struct TestPullPushFilter(Source, Sink) {
312 		mixin TestStage;
313 		Source source;
314 		Sink sink;
315 		void run()()
316 		{
317 			for (;;) {
318 				ulong[4096] buf;
319 				auto n = source.pull(buf[]);
320 				foreach (ref b; buf[0 .. n])
321 					b = b.filter!"pullPush";
322 				if (sink.push(buf[0 .. n]) < 4096)
323 					break;
324 			}
325 		}
326 	}
327 
328 	@pullSink!ulong @allocSource!ulong
329 	struct TestPullAllocFilter(Source, Sink) {
330 		mixin TestStage;
331 		Source source;
332 		Sink sink;
333 		void run()()
334 		{
335 			for (;;) {
336 				ulong[] buf;
337 				if (!sink.alloc(buf, 4096))
338 					assert(0);
339 				auto n = source.pull(buf[]);
340 				foreach (ref b; buf[0 .. n])
341 					b = b.filter!"pullAlloc";
342 				if (sink.commit(n) < 4096)
343 					break;
344 			}
345 		}
346 	}
347 
348 	@pushSink!ulong @pushSource!ulong
349 	struct TestPushFilter(Sink) {
350 		mixin TestStage;
351 		Sink sink;
352 		size_t push(const(ulong)[] buf)
353 		{
354 			return sink.push(buf.map!(filter!"push").array());
355 		}
356 	}
357 
358 	@pushSink!ulong @allocSource!ulong
359 	struct TestPushAllocFilter(Sink) {
360 		mixin TestStage;
361 		Sink sink;
362 		size_t push(const(ulong)[] buf)
363 		out(result) { assert(result <= buf.length); }
364 		body
365 		{
366 			ulong[] ob;
367 			if (!sink.alloc(ob, buf.length))
368 				assert(0);
369 			auto len = min(buf.length, ob.length);
370 			buf.take(len).map!(filter!"pushAlloc").copy(ob);
371 			return sink.commit(len);
372 		}
373 	}
374 
375 	@pushSink!ulong @pullSource!ulong
376 	struct TestPushPullFilter(alias Scheduler) {
377 		mixin Scheduler;
378 		mixin TestStage;
379 		ulong[] buffer;
380 
381 		size_t push(const(ulong)[] buf)
382 		{
383 			buffer ~= buf.map!(filter!"pushPull").array();
384 			if (yield())
385 				return 0;
386 			return buf.length;
387 		}
388 
389 		size_t pull(ulong[] buf)
390 		{
391 			size_t n = buf.length;
392 			while (buffer.length < n) {
393 				if (yield())
394 					break;
395 			}
396 			size_t len = min(n, buffer.length);
397 			buf[0 .. len] = buffer[0 .. len];
398 			buffer = buffer[len .. $];
399 			return len;
400 		}
401 	}
402 
403 	@pushSink!ulong @peekSource!ulong
404 	struct TestPushPeekFilter(alias Scheduler) {
405 		mixin Scheduler;
406 		mixin TestStage;
407 		ulong[] buffer;
408 
409 		size_t push(const(ulong)[] buf)
410 		{
411 			buffer ~= buf.map!(filter!"pushPeek").array();
412 			if (yield())
413 				return 0;
414 			return buf.length;
415 		}
416 
417 		const(ulong)[] peek(size_t n)
418 		{
419 			while (buffer.length < n) {
420 				if (yield())
421 					break;
422 			}
423 			return buffer;
424 		}
425 
426 		void consume(size_t n)
427 		{
428 			buffer = buffer[n .. $];
429 		}
430 	}
431 
432 	@allocSink!ulong @allocSource!ulong
433 	struct TestAllocFilter(Sink) {
434 		mixin TestStage;
435 		Sink sink;
436 		ulong[] buf;
437 
438 		bool alloc(ref ulong[] buf, size_t n)
439 		{
440 			auto r = sink.alloc(buf, n);
441 			this.buf = buf;
442 			return r;
443 		}
444 
445 		size_t commit(size_t n)
446 		{
447 			foreach (ref b; buf[0 .. n])
448 				b = b.filter!"alloc";
449 			return sink.commit(n);
450 		}
451 	}
452 
453 	@allocSink!ulong @pushSource!ulong
454 	struct TestAllocPushFilter(Sink) {
455 		mixin TestStage;
456 		Sink sink;
457 		ulong[] buffer;
458 
459 		bool alloc(ref ulong[] buf, size_t n)
460 		{
461 			buffer = buf = new ulong[n];
462 			return true;
463 		}
464 
465 		size_t commit(size_t n)
466 		{
467 			size_t m = sink.push(buffer[0 .. n].map!(filter!"allocPush").array());
468 			buffer = buffer[m .. $];
469 			return m;
470 		}
471 	}
472 
473 	@allocSink!ulong @pullSource!ulong
474 	struct TestAllocPullFilter(alias Scheduler) {
475 		mixin Scheduler;
476 		mixin TestStage;
477 		ulong[] buffer;
478 		size_t readOffset;
479 		size_t writeOffset;
480 
481 		bool alloc(ref ulong[] buf, size_t n)
482 		{
483 			buffer.length = writeOffset + n;
484 			buf = buffer[writeOffset .. $];
485 			return true;
486 		}
487 
488 		size_t commit(size_t n)
489 		{
490 			foreach (ref b; buffer[writeOffset .. writeOffset + n])
491 				b = b.filter!"allocPull";
492 			writeOffset += n;
493 			if (yield())
494 				return 0;
495 			return n;
496 		}
497 
498 		size_t pull(ulong[] buf)
499 		{
500 			size_t n = buf.length;
501 			while (writeOffset - readOffset < n) {
502 				if (yield())
503 					break;
504 			}
505 			size_t len = min(n, writeOffset - readOffset);
506 			buf[0 .. len] = buffer[readOffset .. readOffset + len];
507 			readOffset += len;
508 			return len;
509 		}
510 	}
511 
512 	@allocSink!ulong @peekSource!ulong
513 	struct TestAllocPeekFilter(alias Scheduler) {
514 		mixin Scheduler;
515 		mixin TestStage;
516 		ulong[] buffer;
517 		size_t readOffset;
518 		size_t writeOffset;
519 
520 		bool alloc(ref ulong[] buf, size_t n)
521 		{
522 			buffer.length = writeOffset + n;
523 			buf = buffer[writeOffset .. $];
524 			return true;
525 		}
526 
527 		size_t commit(size_t n)
528 		{
529 			foreach (ref b; buffer[writeOffset .. writeOffset + n])
530 				b = b.filter!"allocPeek";
531 			writeOffset += n;
532 			if (yield())
533 				return 0;
534 			return n;
535 		}
536 
537 		const(ulong)[] peek(size_t n)
538 		{
539 			while (writeOffset - readOffset < n) {
540 				if (yield())
541 					break;
542 			}
543 			return buffer[readOffset .. writeOffset];
544 		}
545 
546 		void consume(size_t n)
547 		{
548 			readOffset += n;
549 		}
550 	}
551 
552 	string genStage(string filter, string suf)
553 	{
554 		import std.ascii : toUpper;
555 		auto cf = filter[0].toUpper ~ filter[1 .. $];
556 		return "pipe!Test" ~ cf ~ suf ~ "(Arg!Test" ~ cf ~ suf ~ "())";
557 	}
558 
559 	string genChain(string filterList)
560 	{
561 		import std.algorithm : map;
562 		import std.array : join, split;
563 		auto filters = filterList.split(",");
564 		string midstr;
565 		if (filters.length > 2)
566 			midstr = filters[1 .. $ - 1].map!(f => "." ~ genStage(f, "Filter")).join;
567 		return genStage(filters[0], "Source")
568 			~ midstr
569 			~ "." ~ genStage(filters[$ - 1], "Sink") ~ ";";
570 	}
571 
572 	void testChain(string filterlist, R)(R r)
573 		if (isInputRange!R && is(ElementType!R : ulong))
574 	{
575 		auto input = r.map!(a => ulong(a)).array();
576 		logf("Testing %s with %d elements", filterlist, input.length);
577 		auto expectedOutput = input.dup;
578 		auto filters = filterlist.split(",");
579 		if (filters.length > 2) {
580 			foreach (filter; filters[1 .. $ - 1]) {
581 				auto fm = filterMark(filter);
582 				foreach (ref eo; expectedOutput)
583 					eo = (eo << 4) | fm;
584 			}
585 		}
586 		foreach(expectedLength; [ size_t(0), input.length / 3, input.length - 1, input.length,
587 			input.length + 1, input.length * 5 ]) {
588 			outputArray.length = expectedLength;
589 			outputArray[] = 0xbadc0ffee0ddf00d;
590 			inputArray = input;
591 			outputIndex = 0;
592 			mixin(genChain(filterlist));
593 			auto len = min(outputIndex, expectedLength, input.length);
594 			uint left = 8;
595 			size_t all = 0;
596 			if (outputIndex != min(expectedLength, input.length)) {
597 				errorf("Output length is %d, expected %d", outputIndex, min(expectedLength, input.length));
598 				assert(0);
599 			}
600 			for (size_t i = 0; i < len; i++) {
601 				if (expectedOutput[i] != outputArray[i]) {
602 					if (left > 0) {
603 						logf("expected[%d] != output[%d]: %x vs. %x", i, i, expectedOutput[i], outputArray[i]);
604 						--left;
605 					}
606 					all++;
607 				}
608 			}
609 			if (all > 0) {
610 				logf("%s", genChain(filterlist));
611 				logf("total: %d differences", all);
612 			}
613 			assert(all == 0);
614 		}
615 	}
616 
617 	void testChain(string filterlist)()
618 	{
619 		import std.range : iota;
620 		testChain!filterlist(iota(0, 173447));
621 	}
622 
623 }
624 
625 struct SinkDrivenFiberScheduler {
626 	import std.stdio;
627 	import core.thread : Fiber;
628 	Fiber fiber;
629 	mixin NonCopyable;
630 
631 	void stop()
632 	{
633 		auto f = this.fiber;
634 		if (f) {
635 			if (f.state == Fiber.State.HOLD) {
636 				this.fiber = null;
637 				f.call();
638 			}
639 			auto x = f.state;
640 			assert(f.state == Fiber.State.TERM);
641 		}
642 	}
643 
644 	int yield()
645 	{
646 		if (fiber is null)
647 			return 2;
648 		if (fiber.state == Fiber.State.EXEC) {
649 			Fiber.yield();
650 			return fiber is null;
651 		} else {
652 			if (fiber.state == Fiber.State.HOLD)
653 				fiber.call();
654 			return fiber.state != Fiber.State.HOLD;
655 		}
656 	}
657 }
658 
659 mixin template FiberScheduler() {
660 	import flod.pipeline;
661 	SinkDrivenFiberScheduler _flod_scheduler;
662 
663 	int yield() { return _flod_scheduler.yield(); }
664 	void spawn(void delegate() dg)
665 	{
666 		import core.thread : Fiber;
667 		if (!_flod_scheduler.fiber)
668 			_flod_scheduler.fiber = new Fiber(dg, 65536);
669 	}
670 	void stop() { _flod_scheduler.stop(); }
671 }
672 
673 // forwards all calls to its impl
674 private struct Forward(S, Flag!"readFromSink" readFromSink = No.readFromSink) {
675 	enum name = .str!S;
676 	S _impl;
677 	this(Args...)(auto ref Args args)
678 	{
679 		static if (Args.length > 0)
680 			_impl.__ctor(args);
681 	}
682 	~this()
683 	{
684 		debug(FlodTraceLifetime) {
685 			import std.experimental.logger : tracef;
686 			tracef("Destroy %s", name);
687 		}
688 	}
689 	@property ref auto sink()() { return _impl.sink; }
690 	@property ref auto source()() { return _impl.source; }
691 	static if (readFromSink) {
692 		// active source needs to pass pull calls to any push-pull filter at the end of
693 		// the active source chain, so that an inverter wrapping the whole chain can recover
694 		// data sunk into the filter.
695 		auto peek()(size_t n) { return _impl.sink.peek(n); }
696 		void consume()(size_t n) { _impl.sink.consume(n); }
697 		auto pull(T)(T[] buf) { return _impl.sink.pull(buf); }
698 		void spawn()(void delegate() dg) { return _impl.sink.spawn(dg); }
699 		void stop()() { return _impl.sink.stop(); }
700 	} else {
701 		auto peek()(size_t n) { return _impl.peek(n); }
702 		void consume()(size_t n) { _impl.consume(n); }
703 		auto pull(T)(T[] buf) { return _impl.pull(buf); }
704 		void spawn()(void delegate() dg) { return _impl.spawn(dg); }
705 		void stop()() { return _impl.stop(); }
706 	}
707 	auto run()() { return _impl.run(); }
708 	auto step(A...)(auto ref A a) { return _impl.step(a); }
709 	auto alloc(T)(ref T[] buf, size_t n) { return _impl.alloc(buf, n); }
710 	auto commit()(size_t n) { return _impl.commit(n); }
711 	auto push(T)(const(T)[] buf) { return _impl.push(buf); }
712 }
713 
714 private template Inverter(alias method, T) {
715 	@method
716 	struct Inverter(Source) {
717 		import core.thread : Fiber;
718 
719 		Source source;
720 
721 		~this()
722 		{
723 			source.sink.stop();
724 		}
725 
726 		void run()
727 		{
728 			source.run();
729 		}
730 
731 		size_t pull()(T[] buf)
732 		{
733 			source.sink.spawn(&run);
734 			return source.sink.pull(buf);
735 		}
736 
737 		const(T)[] peek()(size_t n)
738 		{
739 			source.sink.spawn(&run);
740 			return source.sink.peek(n);
741 		}
742 
743 		void consume()(size_t n) { source.sink.consume(n); }
744 	}
745 }
746 
747 private void constructInPlace(T, Args...)(ref T t, auto ref Args args)
748 {
749 	debug(FlodTraceLifetime) {
750 		import std.experimental.logger : tracef;
751 		tracef("Construct at %x..%x %s with %s", &t, &t + 1, T.name, Args.stringof);
752 	}
753 	static if (__traits(hasMember, t, "__ctor")) {
754 		t.__ctor(args);
755 	} else static if (Args.length > 0) {
756 		static assert(0, "Stage " ~ str!T ~ " does not have a non-trivial constructor" ~
757 			" but construction was requested with arguments " ~ Args.stringof);
758 	}
759 }
760 
761 private struct Pipeline(alias S, SoP, SiP, A...) {
762 	alias Stage = S;
763 	alias Args = A;
764 	alias SourcePipeline = SoP;
765 	alias SinkPipeline = SiP;
766 
767 	enum bool hasSource = !is(SourcePipeline == typeof(null));
768 	enum bool hasSink   = !is(SinkPipeline == typeof(null));
769 
770 	static if (hasSource) {
771 		alias FirstStage = SourcePipeline.FirstStage;
772 		enum sourcePipeStr = SourcePipeline.pipeStr ~ "->";
773 		enum sourceTreeStr(int indent) = SourcePipeline.treeStr!(indent + 1) ~ "\n";
774 		enum sourceStr = SourcePipeline.str ~ ".";
775 	} else {
776 		alias FirstStage = Stage;
777 		enum sourcePipeStr = "";
778 		enum sourceTreeStr(int indent) = "";
779 		enum sourceStr = "";
780 	}
781 
782 	static if (hasSink) {
783 		alias LastStage = SinkPipeline.LastStage;
784 		enum sinkPipeStr = "->" ~ SinkPipeline.pipeStr;
785 		enum sinkTreeStr(int indent) = "\n" ~ SinkPipeline.treeStr!(indent + 1);
786 		enum sinkStr = "." ~ SinkPipeline.str;
787 	} else {
788 		alias LastStage = Stage;
789 		enum sinkPipeStr = "";
790 		enum sinkTreeStr(int indent) = "";
791 		enum sinkStr = "";
792 	}
793 
794 	static if (is(Traits!LastStage.SourceElementType W))
795 		alias ElementType = W;
796 
797 	enum pipeStr = sourcePipeStr ~ .str!S ~ sinkPipeStr;
798 	enum treeStr(int indent) = sourceTreeStr!indent
799 		~ "|" ~ repeat!(indent, "-") ~ "-" ~ .str!S
800 		~ sinkTreeStr!indent;
801 	enum str = "(" ~ sourceStr ~ .str!Stage ~ sinkStr ~ ")";
802 
803 	static if (hasSink && is(SinkPipeline.Type T))
804 		alias SinkType = T;
805 	static if (hasSource && is(SourcePipeline.Type U))
806 		alias SourceType = U;
807 
808 	static if (isActiveSource!Stage && isActiveSink!Stage && is(SinkType) && is(SourceType))
809 		alias Type = Forward!(Stage!(SourceType, SinkType));
810 	else static if (isActiveSource!Stage && !isActiveSink!Stage && is(SinkType))
811 		alias Type = Forward!(Stage!SinkType, Yes.readFromSink);
812 	else static if (!isActiveSource!Stage && isActiveSink!Stage && is(SourceType))
813 		alias Type = Forward!(Stage!SourceType);
814 	else static if (isPassiveSink!Stage && isPassiveSource!Stage && is(Stage!FiberScheduler SF))
815 		alias Type = Forward!SF;
816 	else static if (is(Stage))
817 		alias Type = Forward!Stage;
818 	else static if (isPassiveSource!Stage && !isSink!Stage && is(SourceType)) // inverter
819 		alias Type = Forward!(Stage!SourceType);
820 
821 	SourcePipeline sourcePipeline;
822 	SinkPipeline sinkPipeline;
823 	Args args;
824 
825 	auto pipe(alias NextStage, NextArgs...)(auto ref NextArgs nextArgs)
826 	{
827 		alias SourceE = Traits!LastStage.SourceElementType;
828 		alias SinkE = Traits!NextStage.SinkElementType;
829 		static assert(is(SourceE == SinkE), "Incompatible element types: " ~
830 			.str!LastStage ~ " produces " ~ SourceE.stringof ~ ", while " ~
831 			.str!NextStage ~ " expects " ~ SinkE.stringof);
832 
833 		static if (areCompatible!(LastStage, NextStage)) {
834 			static if (isPassiveSource!Stage) {
835 				auto result = pipeline!NextStage(this, null, nextArgs);
836 			} else {
837 				static assert(isActiveSource!Stage);
838 				static if (isPassiveSink!NextStage && isPassiveSource!NextStage) {
839 					static if (isPassiveSink!Stage) {
840 						static assert(!hasSource);
841 						static if (hasSink) {
842 							auto result = pipeline!Stage(null, sinkPipeline.pipe!NextStage(nextArgs), args);
843 						} else {
844 							auto result = pipeline!Stage(null, pipeline!NextStage(null, null, nextArgs), args);
845 						}
846 					} else {
847 						static if (hasSink) {
848 							auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))(
849 								pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args),
850 								null);
851 						} else {
852 							auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))(
853 								pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args),
854 								null);
855 						}
856 					}
857 				} else {
858 					static if (hasSink)
859 						auto result = pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args);
860 					else
861 						auto result = pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args);
862 				}
863 			}
864 			static if (isSource!NextStage || isSink!FirstStage)
865 				return result;
866 			else
867 				result.run();
868 		} else {
869 			import std.string : capitalize;
870 			import flod.adapter;
871 			enum adapterName = Traits!LastStage.sourceMethodStr ~ Traits!NextStage.sinkMethodStr.capitalize();
872 			mixin(`return this.` ~ adapterName ~ `.pipe!NextStage(nextArgs);`);
873 		}
874 	}
875 
876 	static if (is(Type)) {
877 		void construct()(ref Type t)
878 		{
879 			static if (hasSource)
880 				sourcePipeline.construct(t.source);
881 			static if (hasSink)
882 				sinkPipeline.construct(t.sink);
883 			constructInPlace(t, args);
884 		}
885 	}
886 
887 	static if (!isSink!FirstStage && !isSource!LastStage) {
888 		void run()()
889 		{
890 			Type t;
891 			construct(t);
892 			t.run();
893 		}
894 	}
895 
896 	static if (!isSink!FirstStage && !isActiveSource!LastStage) {
897 		auto create()()
898 		{
899 			Type t;
900 			construct(t);
901 			return t;
902 		}
903 	}
904 }
905 
906 auto pipeline(alias Stage, SoP, SiP, A...)(auto ref SoP sourcePipeline, auto ref SiP sinkPipeline, auto ref A args)
907 {
908 	return Pipeline!(Stage, SoP, SiP, A)(sourcePipeline, sinkPipeline, args);
909 }
910 
911 template isPipeline(P, alias test) {
912 	static if (is(P == Pipeline!A, A...))
913 		enum isPipeline = test!(P.LastStage);
914 	else
915 		enum isPipeline = false;
916 }
917 
918 enum isPeekPipeline(P) = isPipeline!(P, isPeekSource);
919 
920 enum isPullPipeline(P) = isPipeline!(P, isPullSource);
921 
922 enum isPushPipeline(P) = isPipeline!(P, isPushSource);
923 
924 enum isAllocPipeline(P) = isPipeline!(P, isAllocSource);
925 
926 enum isPipeline(P) = isPushPipeline!P || isPullPipeline!P || isPeekPipeline!P || isAllocPipeline!P;
927 
928 auto pipe(alias Stage, Args...)(auto ref Args args)
929 	if (isSink!Stage || isSource!Stage)
930 {
931 	return pipeline!Stage(null, null, args);
932 }
933 
934 unittest {
935 	auto p1 = pipe!TestPeekSource(Arg!TestPeekSource());
936 	static assert(isPeekPipeline!(typeof(p1)));
937 	static assert(is(p1.ElementType == ulong));
938 	auto p2 = pipe!TestPullSource(Arg!TestPullSource());
939 	static assert(isPullPipeline!(typeof(p2)));
940 	static assert(is(p2.ElementType == ulong));
941 }
942 
943 unittest {
944 	auto p1 = pipe!TestPushSource(Arg!TestPushSource());
945 	static assert(isPushPipeline!(typeof(p1)));
946 	auto p2 = pipe!TestAllocSource(Arg!TestAllocSource());
947 	static assert(isAllocPipeline!(typeof(p2)));
948 }
949 
950 unittest {
951 	// compatible source-sink pairs
952 	testChain!`peek,peek`;
953 	testChain!`pull,pull`;
954 	testChain!`push,push`;
955 	testChain!`alloc,alloc`;
956 }
957 
958 unittest {
959 	// compatible, with 1 filter
960 	testChain!`peek,peek,peek`;
961 	testChain!`peek,peekPull,pull`;
962 	testChain!`peek,peekPush,push`;
963 	testChain!`peek,peekAlloc,alloc`;
964 	testChain!`pull,pullPeek,peek`;
965 	testChain!`pull,pull,pull`;
966 	testChain!`pull,pullPush,push`;
967 	testChain!`pull,pullAlloc,alloc`;
968 	testChain!`push,pushPeek,peek`;
969 	testChain!`push,pushPull,pull`;
970 	testChain!`push,push,push`;
971 	testChain!`push,pushAlloc,alloc`;
972 	testChain!`alloc,allocPeek,peek`;
973 	testChain!`alloc,allocPull,pull`;
974 	testChain!`alloc,allocPush,push`;
975 	testChain!`alloc,alloc,alloc`;
976 }
977 
978 unittest {
979 	// just one active sink at the end
980 	testChain!`peek,peek,peek,peek,peek`;
981 	testChain!`peek,peek,peekPull,pull,pull`;
982 	testChain!`pull,pull,pull,pull,pull`;
983 	testChain!`pull,pull,pullPeek,peek,peek`;
984 }
985 
986 unittest {
987 	// just one active source at the beginning
988 	testChain!`push,push,push,push,push`;
989 	testChain!`push,push,pushAlloc,alloc,alloc`;
990 	testChain!`alloc,alloc,alloc,alloc,alloc`;
991 	testChain!`alloc,alloc,allocPush,push,push`;
992 }
993 
994 unittest {
995 	// convert passive source to active source, longer chains
996 	testChain!`pull,pullPeek,peekAlloc,allocPush,push`;
997 	testChain!`pull,pullPeek,peekPush,pushAlloc,alloc`;
998 	testChain!`peek,peekPull,pullPush,pushAlloc,alloc`;
999 	testChain!`peek,peekPull,pullAlloc,allocPush,push`;
1000 }
1001 
1002 unittest {
1003 	// convert active source to passive source at stage 2, longer passive source chain
1004 	testChain!`push,pushPull,pull,pull,pullPeek,peek,peekPush,push,push`;
1005 }
1006 
1007 unittest {
1008 	// convert active source to passive source at stage >2 (longer active source chain)
1009 	testChain!`push,push,pushPull,pull`;
1010 	testChain!`push,push,push,push,push,pushPull,pull`;
1011 	testChain!`push,push,pushAlloc,alloc,alloc,allocPeek,peek`;
1012 }
1013 
1014 unittest {
1015 	// multiple inverters
1016 	testChain!`alloc,allocPeek,peekPush,pushPull,pull`;
1017 	testChain!`alloc,alloc,alloc,allocPeek,peek,peekPush,push,pushPull,pull`;
1018 	testChain!`alloc,alloc,allocPeek,peekPush,pushPull,pull`;
1019 	testChain!`alloc,alloc,alloc,allocPeek,peekPush,pushPull,pullPush,push,pushAlloc,alloc,allocPush,pushPeek,peekAlloc,allocPull,pull`;
1020 }
1021 
1022 unittest {
1023 	// implicit adapters, pull->push
1024 	testChain!`pull,push`;
1025 	testChain!`pull,push,push`;
1026 	testChain!`pull,pushPeek,peek`;
1027 	testChain!`pull,pushPull,pull`;
1028 	testChain!`pull,pushAlloc,alloc`;
1029 }
1030 
1031 unittest {
1032 	// implicit adapters, pull->peek
1033 	testChain!`pull,peek`;
1034 	testChain!`pull,peekPush,push`;
1035 	testChain!`pull,peek,peek`;
1036 	testChain!`pull,peekPull,pull`;
1037 	testChain!`pull,peekAlloc,alloc`;
1038 }
1039 
1040 unittest {
1041 	// implicit adapters, pull->alloc
1042 	testChain!`pull,alloc`;
1043 	testChain!`pull,allocPush,push`;
1044 	testChain!`pull,allocPeek,peek`;
1045 	testChain!`pull,allocPull,pull`;
1046 	testChain!`pull,alloc,alloc`;
1047 }
1048 
1049 unittest {
1050 	// implicit adapters, push->pull
1051 	testChain!`push,pull`;
1052 	testChain!`push,pullPush,push`;
1053 	testChain!`push,pullAlloc,alloc`;
1054 	testChain!`push,pullPeek,peek`;
1055 	testChain!`push,pull,pull`;
1056 }
1057 
1058 unittest {
1059 	// implicit adapters, push->peek
1060 	testChain!`push,peek`;
1061 	testChain!`push,peekPush,push`;
1062 	testChain!`push,peekAlloc,alloc`;
1063 	testChain!`push,peek,peek`;
1064 	testChain!`push,peekPull,pull`;
1065 }
1066 
1067 unittest {
1068 	// implicit adapters, push->alloc
1069 	testChain!`push,alloc`;
1070 	testChain!`push,allocPush,push`;
1071 	testChain!`push,allocPeek,peek`;
1072 	testChain!`push,allocPull,pull`;
1073 	testChain!`push,alloc,alloc`;
1074 }
1075 
1076 unittest {
1077 	// implicit adapters, peek->pull
1078 	testChain!`peek,pull`;
1079 	testChain!`peek,pullPush,push`;
1080 	testChain!`peek,pullAlloc,alloc`;
1081 	testChain!`peek,pullPeek,peek`;
1082 	testChain!`peek,pull,pull`;
1083 }
1084 
1085 unittest {
1086 	// implicit adapters, peek->push
1087 	testChain!`peek,push`;
1088 	testChain!`peek,push,push`;
1089 	testChain!`peek,pushAlloc,alloc`;
1090 	testChain!`peek,pushPeek,peek`;
1091 	testChain!`peek,pushPull,pull`;
1092 }
1093 
1094 unittest {
1095 	// implicit adapters, peek->alloc
1096 	testChain!`peek,alloc`;
1097 	testChain!`peek,allocPush,push`;
1098 	testChain!`peek,allocPeek,peek`;
1099 	testChain!`peek,allocPull,pull`;
1100 	testChain!`peek,alloc,alloc`;
1101 }
1102 
1103 unittest {
1104 	// implicit adapters, alloc->peek
1105 	testChain!`alloc,peek`;
1106 	testChain!`alloc,peekPush,push`;
1107 	testChain!`alloc,peekAlloc,alloc`;
1108 	testChain!`alloc,peek,peek`;
1109 	testChain!`alloc,peekPull,pull`;
1110 }
1111 
1112 unittest {
1113 	// implicit adapters, alloc->pull
1114 	testChain!`alloc,pull`;
1115 	testChain!`alloc,pullPush,push`;
1116 	testChain!`alloc,pullAlloc,alloc`;
1117 	testChain!`alloc,pullPeek,peek`;
1118 	testChain!`alloc,pull,pull`;
1119 }
1120 
1121 unittest {
1122 	// implicit adapters, alloc->push
1123 	testChain!`alloc,push`;
1124 	testChain!`alloc,push,push`;
1125 	testChain!`alloc,pushAlloc,alloc`;
1126 	testChain!`alloc,pushPeek,peek`;
1127 	testChain!`alloc,pushPull,pull`;
1128 }
1129 
1130 unittest {
1131 	// implicit adapters, all in one pipeline
1132 	testChain!`alloc,push,peek,pull,alloc,peek,push,pull,peek,alloc,pull,push,peek`;
1133 }