1 /** Pipeline composition. 2 * 3 * Authors: $(LINK2 https://github.com/epi, Adrian Matoga) 4 * Copyright: © 2016 Adrian Matoga 5 * License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0). 6 */ 7 module flod.pipeline; 8 9 public import std.typecons : Flag, Yes, No; 10 import flod.traits; 11 import flod.meta : NonCopyable, str; 12 13 version(unittest) { 14 import std.algorithm : min, max, map, copy; 15 import std.conv : to; 16 import std.experimental.logger : logf, errorf; 17 import std.range : isInputRange, ElementType, array, take; 18 import std.string : split, toLower, startsWith, endsWith; 19 20 ulong[] inputArray; 21 ulong[] outputArray; 22 size_t outputIndex; 23 24 uint filterMark(string f) { 25 f = f.toLower; 26 uint fm; 27 if (f.startsWith("pull")) 28 fm = 1; 29 else if (f.startsWith("push")) 30 fm = 2; 31 else if (f.startsWith("alloc")) 32 fm = 3; 33 if (f.endsWith("pull")) 34 fm |= 1 << 2; 35 else if (f.endsWith("push")) 36 fm |= 2 << 2; 37 else if (f.endsWith("alloc")) 38 fm |= 3 << 2; 39 return fm; 40 } 41 42 ulong filter(string f)(ulong a) { 43 enum fm = filterMark(f); 44 return (a << 4) | fm; 45 } 46 47 // sources: 48 struct Arg(alias T) { bool constructed = false; } 49 50 mixin template TestStage(N...) { 51 alias This = typeof(this); 52 static if (is(This == A!(B, C), alias A, B, C)) 53 alias Stage = A; 54 else static if (is(This == D!(E), alias D, E)) 55 alias Stage = D; 56 else static if (is(This == F!G, alias F, alias G)) 57 alias Stage = F; 58 else static if (is(This)) 59 alias Stage = This; 60 else 61 static assert(0, "don't know how to get stage from " ~ This.stringof ~ " (" ~ str!This ~ ")"); 62 63 @disable this(this); 64 @disable void opAssign(typeof(this)); 65 66 // this is to ensure that construct() calls the right constructor for each stage 67 this(Arg!Stage arg) { this.arg = arg; this.arg.constructed = true; } 68 Arg!Stage arg; 69 } 70 71 @pullSource!ulong 72 struct TestPullSource { 73 mixin TestStage; 74 75 size_t pull(ulong[] buf) 76 { 77 auto len = min(buf.length, inputArray.length); 78 buf[0 .. len] = inputArray[0 .. len]; 79 inputArray = inputArray[len .. $]; 80 return len; 81 } 82 } 83 84 @peekSource!ulong 85 struct TestPeekSource { 86 mixin TestStage; 87 88 const(ulong)[] peek(size_t n) 89 { 90 auto len = min(max(n, 2909), inputArray.length); 91 return inputArray[0 .. len]; 92 } 93 94 void consume(size_t n) { inputArray = inputArray[n .. $]; } 95 } 96 97 @pushSource!ulong 98 struct TestPushSource(Sink) { 99 mixin TestStage; 100 Sink sink; 101 102 void run()() 103 { 104 while (inputArray.length) { 105 auto len = min(1337, inputArray.length); 106 if (sink.push(inputArray[0 .. len]) != len) 107 break; 108 inputArray = inputArray[len .. $]; 109 } 110 } 111 } 112 113 @allocSource!ulong 114 struct TestAllocSource(Sink) { 115 mixin TestStage; 116 Sink sink; 117 118 void run()() 119 { 120 ulong[] buf; 121 while (inputArray.length) { 122 auto len = min(1337, inputArray.length); 123 if (!sink.alloc(buf, len)) 124 assert(0); 125 buf[0 .. len] = inputArray[0 .. len]; 126 if (sink.commit(len) != len) 127 break; 128 inputArray = inputArray[len .. $]; 129 } 130 } 131 } 132 133 // sinks: 134 135 @pullSink!ulong 136 struct TestPullSink(Source) { 137 mixin TestStage; 138 Source source; 139 140 void run() 141 { 142 while (outputIndex < outputArray.length) { 143 auto len = min(4157, outputArray.length - outputIndex); 144 auto pd = source.pull(outputArray[outputIndex .. outputIndex + len]); 145 outputIndex += pd; 146 if (pd < len) 147 break; 148 } 149 } 150 } 151 152 @peekSink!ulong 153 struct TestPeekSink(Source) { 154 mixin TestStage; 155 Source source; 156 157 void run() 158 { 159 while (outputIndex < outputArray.length) { 160 auto len = min(4157, outputArray.length - outputIndex); 161 auto ib = source.peek(len); 162 auto olen = min(len, ib.length, 6379); 163 outputArray[outputIndex .. outputIndex + olen] = ib[0 .. olen]; 164 outputIndex += olen; 165 source.consume(olen); 166 if (olen < len) 167 break; 168 } 169 } 170 } 171 172 @pushSink!ulong 173 struct TestPushSink { 174 mixin TestStage; 175 176 size_t push(const(ulong)[] buf) 177 { 178 auto len = min(buf.length, outputArray.length - outputIndex); 179 if (len) { 180 outputArray[outputIndex .. outputIndex + len] = buf[0 .. len]; 181 outputIndex += len; 182 } 183 return len; 184 } 185 } 186 187 @allocSink!ulong 188 struct TestAllocSink { 189 mixin TestStage; 190 ulong[] last; 191 192 bool alloc(ref ulong[] buf, size_t n) 193 { 194 if (n < outputArray.length - outputIndex) 195 buf = outputArray[outputIndex .. outputIndex + n]; 196 else 197 buf = last = new ulong[n]; 198 return true; 199 } 200 201 size_t commit(size_t n) 202 out(result) { assert(result <= n); } 203 body 204 { 205 if (!last) { 206 outputIndex += n; 207 return n; 208 } else { 209 auto len = min(n, outputArray.length - outputIndex); 210 outputArray[outputIndex .. outputIndex + len] = last[0 .. len]; 211 outputIndex += len; 212 return len; 213 } 214 } 215 } 216 217 // filter 218 219 @peekSink!ulong @peekSource!ulong 220 struct TestPeekFilter(Source) { 221 mixin TestStage; 222 Source source; 223 const(ulong)[] peek(size_t n) 224 { 225 return source.peek(n).map!(filter!"peek").array(); 226 } 227 void consume(size_t n) { source.consume(n); } 228 } 229 230 @peekSink!ulong @pullSource!ulong 231 struct TestPeekPullFilter(Source) { 232 mixin TestStage; 233 Source source; 234 size_t pull(ulong[] buf) 235 { 236 auto ib = source.peek(buf.length); 237 auto len = min(ib.length, buf.length); 238 ib.take(len).map!(filter!"peekPull").copy(buf); 239 source.consume(len); 240 return len; 241 } 242 } 243 244 @peekSink!ulong @pushSource!ulong 245 struct TestPeekPushFilter(Source, Sink) { 246 mixin TestStage; 247 Source source; 248 Sink sink; 249 void run()() 250 { 251 for (;;) { 252 auto ib = source.peek(4096); 253 auto ob = ib.map!(filter!"peekPush").array(); 254 source.consume(ib.length); 255 if (sink.push(ob) < 4096) 256 break; 257 } 258 } 259 } 260 261 @peekSink!ulong @allocSource!ulong 262 struct TestPeekAllocFilter(Source, Sink) { 263 mixin TestStage; 264 Source source; 265 Sink sink; 266 void run()() 267 { 268 ulong[] buf; 269 for (;;) { 270 auto ib = source.peek(4096); 271 if (!sink.alloc(buf, ib.length)) 272 assert(0); 273 auto len = min(ib.length, buf.length); 274 ib.take(len).map!(filter!"peekAlloc").copy(buf); 275 source.consume(len); 276 if (sink.commit(len) < 4096) 277 break; 278 } 279 } 280 } 281 282 @pullSink!ulong @pullSource!ulong 283 struct TestPullFilter(Source) { 284 mixin TestStage; 285 Source source; 286 size_t pull(ulong[] buf) 287 { 288 size_t n = source.pull(buf); 289 foreach (ref b; buf[0 .. n]) 290 b = b.filter!"pull"; 291 return n; 292 } 293 } 294 295 @pullSink!ulong @peekSource!ulong 296 struct TestPullPeekFilter(Source) { 297 mixin TestStage; 298 Source source; 299 const(ulong)[] peek(size_t n) 300 { 301 auto buf = new ulong[n]; 302 size_t m = source.pull(buf[]); 303 foreach (ref b; buf[0 .. m]) 304 b = b.filter!"pullPeek"; 305 return buf[0 .. m]; 306 } 307 void consume(size_t n) {} 308 } 309 310 @pullSink!ulong @pushSource!ulong 311 struct TestPullPushFilter(Source, Sink) { 312 mixin TestStage; 313 Source source; 314 Sink sink; 315 void run()() 316 { 317 for (;;) { 318 ulong[4096] buf; 319 auto n = source.pull(buf[]); 320 foreach (ref b; buf[0 .. n]) 321 b = b.filter!"pullPush"; 322 if (sink.push(buf[0 .. n]) < 4096) 323 break; 324 } 325 } 326 } 327 328 @pullSink!ulong @allocSource!ulong 329 struct TestPullAllocFilter(Source, Sink) { 330 mixin TestStage; 331 Source source; 332 Sink sink; 333 void run()() 334 { 335 for (;;) { 336 ulong[] buf; 337 if (!sink.alloc(buf, 4096)) 338 assert(0); 339 auto n = source.pull(buf[]); 340 foreach (ref b; buf[0 .. n]) 341 b = b.filter!"pullAlloc"; 342 if (sink.commit(n) < 4096) 343 break; 344 } 345 } 346 } 347 348 @pushSink!ulong @pushSource!ulong 349 struct TestPushFilter(Sink) { 350 mixin TestStage; 351 Sink sink; 352 size_t push(const(ulong)[] buf) 353 { 354 return sink.push(buf.map!(filter!"push").array()); 355 } 356 } 357 358 @pushSink!ulong @allocSource!ulong 359 struct TestPushAllocFilter(Sink) { 360 mixin TestStage; 361 Sink sink; 362 size_t push(const(ulong)[] buf) 363 out(result) { assert(result <= buf.length); } 364 body 365 { 366 ulong[] ob; 367 if (!sink.alloc(ob, buf.length)) 368 assert(0); 369 auto len = min(buf.length, ob.length); 370 buf.take(len).map!(filter!"pushAlloc").copy(ob); 371 return sink.commit(len); 372 } 373 } 374 375 @pushSink!ulong @pullSource!ulong 376 struct TestPushPullFilter(alias Scheduler) { 377 mixin Scheduler; 378 mixin TestStage; 379 ulong[] buffer; 380 381 size_t push(const(ulong)[] buf) 382 { 383 buffer ~= buf.map!(filter!"pushPull").array(); 384 if (yield()) 385 return 0; 386 return buf.length; 387 } 388 389 size_t pull(ulong[] buf) 390 { 391 size_t n = buf.length; 392 while (buffer.length < n) { 393 if (yield()) 394 break; 395 } 396 size_t len = min(n, buffer.length); 397 buf[0 .. len] = buffer[0 .. len]; 398 buffer = buffer[len .. $]; 399 return len; 400 } 401 } 402 403 @pushSink!ulong @peekSource!ulong 404 struct TestPushPeekFilter(alias Scheduler) { 405 mixin Scheduler; 406 mixin TestStage; 407 ulong[] buffer; 408 409 size_t push(const(ulong)[] buf) 410 { 411 buffer ~= buf.map!(filter!"pushPeek").array(); 412 if (yield()) 413 return 0; 414 return buf.length; 415 } 416 417 const(ulong)[] peek(size_t n) 418 { 419 while (buffer.length < n) { 420 if (yield()) 421 break; 422 } 423 return buffer; 424 } 425 426 void consume(size_t n) 427 { 428 buffer = buffer[n .. $]; 429 } 430 } 431 432 @allocSink!ulong @allocSource!ulong 433 struct TestAllocFilter(Sink) { 434 mixin TestStage; 435 Sink sink; 436 ulong[] buf; 437 438 bool alloc(ref ulong[] buf, size_t n) 439 { 440 auto r = sink.alloc(buf, n); 441 this.buf = buf; 442 return r; 443 } 444 445 size_t commit(size_t n) 446 { 447 foreach (ref b; buf[0 .. n]) 448 b = b.filter!"alloc"; 449 return sink.commit(n); 450 } 451 } 452 453 @allocSink!ulong @pushSource!ulong 454 struct TestAllocPushFilter(Sink) { 455 mixin TestStage; 456 Sink sink; 457 ulong[] buffer; 458 459 bool alloc(ref ulong[] buf, size_t n) 460 { 461 buffer = buf = new ulong[n]; 462 return true; 463 } 464 465 size_t commit(size_t n) 466 { 467 size_t m = sink.push(buffer[0 .. n].map!(filter!"allocPush").array()); 468 buffer = buffer[m .. $]; 469 return m; 470 } 471 } 472 473 @allocSink!ulong @pullSource!ulong 474 struct TestAllocPullFilter(alias Scheduler) { 475 mixin Scheduler; 476 mixin TestStage; 477 ulong[] buffer; 478 size_t readOffset; 479 size_t writeOffset; 480 481 bool alloc(ref ulong[] buf, size_t n) 482 { 483 buffer.length = writeOffset + n; 484 buf = buffer[writeOffset .. $]; 485 return true; 486 } 487 488 size_t commit(size_t n) 489 { 490 foreach (ref b; buffer[writeOffset .. writeOffset + n]) 491 b = b.filter!"allocPull"; 492 writeOffset += n; 493 if (yield()) 494 return 0; 495 return n; 496 } 497 498 size_t pull(ulong[] buf) 499 { 500 size_t n = buf.length; 501 while (writeOffset - readOffset < n) { 502 if (yield()) 503 break; 504 } 505 size_t len = min(n, writeOffset - readOffset); 506 buf[0 .. len] = buffer[readOffset .. readOffset + len]; 507 readOffset += len; 508 return len; 509 } 510 } 511 512 @allocSink!ulong @peekSource!ulong 513 struct TestAllocPeekFilter(alias Scheduler) { 514 mixin Scheduler; 515 mixin TestStage; 516 ulong[] buffer; 517 size_t readOffset; 518 size_t writeOffset; 519 520 bool alloc(ref ulong[] buf, size_t n) 521 { 522 buffer.length = writeOffset + n; 523 buf = buffer[writeOffset .. $]; 524 return true; 525 } 526 527 size_t commit(size_t n) 528 { 529 foreach (ref b; buffer[writeOffset .. writeOffset + n]) 530 b = b.filter!"allocPeek"; 531 writeOffset += n; 532 if (yield()) 533 return 0; 534 return n; 535 } 536 537 const(ulong)[] peek(size_t n) 538 { 539 while (writeOffset - readOffset < n) { 540 if (yield()) 541 break; 542 } 543 return buffer[readOffset .. writeOffset]; 544 } 545 546 void consume(size_t n) 547 { 548 readOffset += n; 549 } 550 } 551 552 string genStage(string filter, string suf) 553 { 554 import std.ascii : toUpper; 555 auto cf = filter[0].toUpper ~ filter[1 .. $]; 556 return "pipe!Test" ~ cf ~ suf ~ "(Arg!Test" ~ cf ~ suf ~ "())"; 557 } 558 559 string genChain(string filterList) 560 { 561 import std.algorithm : map; 562 import std.array : join, split; 563 auto filters = filterList.split(","); 564 string midstr; 565 if (filters.length > 2) 566 midstr = filters[1 .. $ - 1].map!(f => "." ~ genStage(f, "Filter")).join; 567 return genStage(filters[0], "Source") 568 ~ midstr 569 ~ "." ~ genStage(filters[$ - 1], "Sink") ~ ";"; 570 } 571 572 void testChain(string filterlist, R)(R r) 573 if (isInputRange!R && is(ElementType!R : ulong)) 574 { 575 auto input = r.map!(a => ulong(a)).array(); 576 logf("Testing %s with %d elements", filterlist, input.length); 577 auto expectedOutput = input.dup; 578 auto filters = filterlist.split(","); 579 if (filters.length > 2) { 580 foreach (filter; filters[1 .. $ - 1]) { 581 auto fm = filterMark(filter); 582 foreach (ref eo; expectedOutput) 583 eo = (eo << 4) | fm; 584 } 585 } 586 foreach(expectedLength; [ size_t(0), input.length / 3, input.length - 1, input.length, 587 input.length + 1, input.length * 5 ]) { 588 outputArray.length = expectedLength; 589 outputArray[] = 0xbadc0ffee0ddf00d; 590 inputArray = input; 591 outputIndex = 0; 592 mixin(genChain(filterlist)); 593 auto len = min(outputIndex, expectedLength, input.length); 594 uint left = 8; 595 size_t all = 0; 596 if (outputIndex != min(expectedLength, input.length)) { 597 errorf("Output length is %d, expected %d", outputIndex, min(expectedLength, input.length)); 598 assert(0); 599 } 600 for (size_t i = 0; i < len; i++) { 601 if (expectedOutput[i] != outputArray[i]) { 602 if (left > 0) { 603 logf("expected[%d] != output[%d]: %x vs. %x", i, i, expectedOutput[i], outputArray[i]); 604 --left; 605 } 606 all++; 607 } 608 } 609 if (all > 0) { 610 logf("%s", genChain(filterlist)); 611 logf("total: %d differences", all); 612 } 613 assert(all == 0); 614 } 615 } 616 617 void testChain(string filterlist)() 618 { 619 import std.range : iota; 620 testChain!filterlist(iota(0, 173447)); 621 } 622 623 } 624 625 struct SinkDrivenFiberScheduler { 626 import std.stdio; 627 import core.thread : Fiber; 628 Fiber fiber; 629 mixin NonCopyable; 630 631 void stop() 632 { 633 auto f = this.fiber; 634 if (f) { 635 if (f.state == Fiber.State.HOLD) { 636 this.fiber = null; 637 f.call(); 638 } 639 auto x = f.state; 640 assert(f.state == Fiber.State.TERM); 641 } 642 } 643 644 int yield() 645 { 646 if (fiber is null) 647 return 2; 648 if (fiber.state == Fiber.State.EXEC) { 649 Fiber.yield(); 650 return fiber is null; 651 } else { 652 if (fiber.state == Fiber.State.HOLD) 653 fiber.call(); 654 return fiber.state != Fiber.State.HOLD; 655 } 656 } 657 } 658 659 mixin template FiberScheduler() { 660 import flod.pipeline; 661 SinkDrivenFiberScheduler _flod_scheduler; 662 663 int yield() { return _flod_scheduler.yield(); } 664 void spawn(void delegate() dg) 665 { 666 import core.thread : Fiber; 667 if (!_flod_scheduler.fiber) 668 _flod_scheduler.fiber = new Fiber(dg, 65536); 669 } 670 void stop() { _flod_scheduler.stop(); } 671 } 672 673 // forwards all calls to its impl 674 private struct Forward(S, Flag!"readFromSink" readFromSink = No.readFromSink) { 675 enum name = .str!S; 676 S _impl; 677 this(Args...)(auto ref Args args) 678 { 679 static if (Args.length > 0) 680 _impl.__ctor(args); 681 } 682 ~this() 683 { 684 debug(FlodTraceLifetime) { 685 import std.experimental.logger : tracef; 686 tracef("Destroy %s", name); 687 } 688 } 689 @property ref auto sink()() { return _impl.sink; } 690 @property ref auto source()() { return _impl.source; } 691 static if (readFromSink) { 692 // active source needs to pass pull calls to any push-pull filter at the end of 693 // the active source chain, so that an inverter wrapping the whole chain can recover 694 // data sunk into the filter. 695 auto peek()(size_t n) { return _impl.sink.peek(n); } 696 void consume()(size_t n) { _impl.sink.consume(n); } 697 auto pull(T)(T[] buf) { return _impl.sink.pull(buf); } 698 void spawn()(void delegate() dg) { return _impl.sink.spawn(dg); } 699 void stop()() { return _impl.sink.stop(); } 700 } else { 701 auto peek()(size_t n) { return _impl.peek(n); } 702 void consume()(size_t n) { _impl.consume(n); } 703 auto pull(T)(T[] buf) { return _impl.pull(buf); } 704 void spawn()(void delegate() dg) { return _impl.spawn(dg); } 705 void stop()() { return _impl.stop(); } 706 } 707 auto run()() { return _impl.run(); } 708 auto step(A...)(auto ref A a) { return _impl.step(a); } 709 auto alloc(T)(ref T[] buf, size_t n) { return _impl.alloc(buf, n); } 710 auto commit()(size_t n) { return _impl.commit(n); } 711 auto push(T)(const(T)[] buf) { return _impl.push(buf); } 712 } 713 714 private template Inverter(alias method, T) { 715 @method 716 struct Inverter(Source) { 717 import core.thread : Fiber; 718 719 Source source; 720 721 ~this() 722 { 723 source.sink.stop(); 724 } 725 726 void run() 727 { 728 source.run(); 729 } 730 731 size_t pull()(T[] buf) 732 { 733 source.sink.spawn(&run); 734 return source.sink.pull(buf); 735 } 736 737 const(T)[] peek()(size_t n) 738 { 739 source.sink.spawn(&run); 740 return source.sink.peek(n); 741 } 742 743 void consume()(size_t n) { source.sink.consume(n); } 744 } 745 } 746 747 private void constructInPlace(T, Args...)(ref T t, auto ref Args args) 748 { 749 debug(FlodTraceLifetime) { 750 import std.experimental.logger : tracef; 751 tracef("Construct at %x..%x %s with %s", &t, &t + 1, T.name, Args.stringof); 752 } 753 static if (__traits(hasMember, t, "__ctor")) { 754 t.__ctor(args); 755 } else static if (Args.length > 0) { 756 static assert(0, "Stage " ~ str!T ~ " does not have a non-trivial constructor" ~ 757 " but construction was requested with arguments " ~ Args.stringof); 758 } 759 } 760 761 private struct Pipeline(alias S, SoP, SiP, A...) { 762 alias Stage = S; 763 alias Args = A; 764 alias SourcePipeline = SoP; 765 alias SinkPipeline = SiP; 766 767 enum bool hasSource = !is(SourcePipeline == typeof(null)); 768 enum bool hasSink = !is(SinkPipeline == typeof(null)); 769 770 static if (hasSource) { 771 alias FirstStage = SourcePipeline.FirstStage; 772 enum sourcePipeStr = SourcePipeline.pipeStr ~ "->"; 773 enum sourceTreeStr(int indent) = SourcePipeline.treeStr!(indent + 1) ~ "\n"; 774 enum sourceStr = SourcePipeline.str ~ "."; 775 } else { 776 alias FirstStage = Stage; 777 enum sourcePipeStr = ""; 778 enum sourceTreeStr(int indent) = ""; 779 enum sourceStr = ""; 780 } 781 782 static if (hasSink) { 783 alias LastStage = SinkPipeline.LastStage; 784 enum sinkPipeStr = "->" ~ SinkPipeline.pipeStr; 785 enum sinkTreeStr(int indent) = "\n" ~ SinkPipeline.treeStr!(indent + 1); 786 enum sinkStr = "." ~ SinkPipeline.str; 787 } else { 788 alias LastStage = Stage; 789 enum sinkPipeStr = ""; 790 enum sinkTreeStr(int indent) = ""; 791 enum sinkStr = ""; 792 } 793 794 static if (is(Traits!LastStage.SourceElementType W)) 795 alias ElementType = W; 796 797 enum pipeStr = sourcePipeStr ~ .str!S ~ sinkPipeStr; 798 enum treeStr(int indent) = sourceTreeStr!indent 799 ~ "|" ~ repeat!(indent, "-") ~ "-" ~ .str!S 800 ~ sinkTreeStr!indent; 801 enum str = "(" ~ sourceStr ~ .str!Stage ~ sinkStr ~ ")"; 802 803 static if (hasSink && is(SinkPipeline.Type T)) 804 alias SinkType = T; 805 static if (hasSource && is(SourcePipeline.Type U)) 806 alias SourceType = U; 807 808 static if (isActiveSource!Stage && isActiveSink!Stage && is(SinkType) && is(SourceType)) 809 alias Type = Forward!(Stage!(SourceType, SinkType)); 810 else static if (isActiveSource!Stage && !isActiveSink!Stage && is(SinkType)) 811 alias Type = Forward!(Stage!SinkType, Yes.readFromSink); 812 else static if (!isActiveSource!Stage && isActiveSink!Stage && is(SourceType)) 813 alias Type = Forward!(Stage!SourceType); 814 else static if (isPassiveSink!Stage && isPassiveSource!Stage && is(Stage!FiberScheduler SF)) 815 alias Type = Forward!SF; 816 else static if (is(Stage)) 817 alias Type = Forward!Stage; 818 else static if (isPassiveSource!Stage && !isSink!Stage && is(SourceType)) // inverter 819 alias Type = Forward!(Stage!SourceType); 820 821 SourcePipeline sourcePipeline; 822 SinkPipeline sinkPipeline; 823 Args args; 824 825 auto pipe(alias NextStage, NextArgs...)(auto ref NextArgs nextArgs) 826 { 827 alias SourceE = Traits!LastStage.SourceElementType; 828 alias SinkE = Traits!NextStage.SinkElementType; 829 static assert(is(SourceE == SinkE), "Incompatible element types: " ~ 830 .str!LastStage ~ " produces " ~ SourceE.stringof ~ ", while " ~ 831 .str!NextStage ~ " expects " ~ SinkE.stringof); 832 833 static if (areCompatible!(LastStage, NextStage)) { 834 static if (isPassiveSource!Stage) { 835 auto result = pipeline!NextStage(this, null, nextArgs); 836 } else { 837 static assert(isActiveSource!Stage); 838 static if (isPassiveSink!NextStage && isPassiveSource!NextStage) { 839 static if (isPassiveSink!Stage) { 840 static assert(!hasSource); 841 static if (hasSink) { 842 auto result = pipeline!Stage(null, sinkPipeline.pipe!NextStage(nextArgs), args); 843 } else { 844 auto result = pipeline!Stage(null, pipeline!NextStage(null, null, nextArgs), args); 845 } 846 } else { 847 static if (hasSink) { 848 auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))( 849 pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args), 850 null); 851 } else { 852 auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))( 853 pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args), 854 null); 855 } 856 } 857 } else { 858 static if (hasSink) 859 auto result = pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args); 860 else 861 auto result = pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args); 862 } 863 } 864 static if (isSource!NextStage || isSink!FirstStage) 865 return result; 866 else 867 result.run(); 868 } else { 869 import std.string : capitalize; 870 import flod.adapter; 871 enum adapterName = Traits!LastStage.sourceMethodStr ~ Traits!NextStage.sinkMethodStr.capitalize(); 872 mixin(`return this.` ~ adapterName ~ `.pipe!NextStage(nextArgs);`); 873 } 874 } 875 876 static if (is(Type)) { 877 void construct()(ref Type t) 878 { 879 static if (hasSource) 880 sourcePipeline.construct(t.source); 881 static if (hasSink) 882 sinkPipeline.construct(t.sink); 883 constructInPlace(t, args); 884 } 885 } 886 887 static if (!isSink!FirstStage && !isSource!LastStage) { 888 void run()() 889 { 890 Type t; 891 construct(t); 892 t.run(); 893 } 894 } 895 896 static if (!isSink!FirstStage && !isActiveSource!LastStage) { 897 auto create()() 898 { 899 Type t; 900 construct(t); 901 return t; 902 } 903 } 904 } 905 906 auto pipeline(alias Stage, SoP, SiP, A...)(auto ref SoP sourcePipeline, auto ref SiP sinkPipeline, auto ref A args) 907 { 908 return Pipeline!(Stage, SoP, SiP, A)(sourcePipeline, sinkPipeline, args); 909 } 910 911 template isPipeline(P, alias test) { 912 static if (is(P == Pipeline!A, A...)) 913 enum isPipeline = test!(P.LastStage); 914 else 915 enum isPipeline = false; 916 } 917 918 enum isPeekPipeline(P) = isPipeline!(P, isPeekSource); 919 920 enum isPullPipeline(P) = isPipeline!(P, isPullSource); 921 922 enum isPushPipeline(P) = isPipeline!(P, isPushSource); 923 924 enum isAllocPipeline(P) = isPipeline!(P, isAllocSource); 925 926 enum isPipeline(P) = isPushPipeline!P || isPullPipeline!P || isPeekPipeline!P || isAllocPipeline!P; 927 928 auto pipe(alias Stage, Args...)(auto ref Args args) 929 if (isSink!Stage || isSource!Stage) 930 { 931 return pipeline!Stage(null, null, args); 932 } 933 934 unittest { 935 auto p1 = pipe!TestPeekSource(Arg!TestPeekSource()); 936 static assert(isPeekPipeline!(typeof(p1))); 937 static assert(is(p1.ElementType == ulong)); 938 auto p2 = pipe!TestPullSource(Arg!TestPullSource()); 939 static assert(isPullPipeline!(typeof(p2))); 940 static assert(is(p2.ElementType == ulong)); 941 } 942 943 unittest { 944 auto p1 = pipe!TestPushSource(Arg!TestPushSource()); 945 static assert(isPushPipeline!(typeof(p1))); 946 auto p2 = pipe!TestAllocSource(Arg!TestAllocSource()); 947 static assert(isAllocPipeline!(typeof(p2))); 948 } 949 950 unittest { 951 // compatible source-sink pairs 952 testChain!`peek,peek`; 953 testChain!`pull,pull`; 954 testChain!`push,push`; 955 testChain!`alloc,alloc`; 956 } 957 958 unittest { 959 // compatible, with 1 filter 960 testChain!`peek,peek,peek`; 961 testChain!`peek,peekPull,pull`; 962 testChain!`peek,peekPush,push`; 963 testChain!`peek,peekAlloc,alloc`; 964 testChain!`pull,pullPeek,peek`; 965 testChain!`pull,pull,pull`; 966 testChain!`pull,pullPush,push`; 967 testChain!`pull,pullAlloc,alloc`; 968 testChain!`push,pushPeek,peek`; 969 testChain!`push,pushPull,pull`; 970 testChain!`push,push,push`; 971 testChain!`push,pushAlloc,alloc`; 972 testChain!`alloc,allocPeek,peek`; 973 testChain!`alloc,allocPull,pull`; 974 testChain!`alloc,allocPush,push`; 975 testChain!`alloc,alloc,alloc`; 976 } 977 978 unittest { 979 // just one active sink at the end 980 testChain!`peek,peek,peek,peek,peek`; 981 testChain!`peek,peek,peekPull,pull,pull`; 982 testChain!`pull,pull,pull,pull,pull`; 983 testChain!`pull,pull,pullPeek,peek,peek`; 984 } 985 986 unittest { 987 // just one active source at the beginning 988 testChain!`push,push,push,push,push`; 989 testChain!`push,push,pushAlloc,alloc,alloc`; 990 testChain!`alloc,alloc,alloc,alloc,alloc`; 991 testChain!`alloc,alloc,allocPush,push,push`; 992 } 993 994 unittest { 995 // convert passive source to active source, longer chains 996 testChain!`pull,pullPeek,peekAlloc,allocPush,push`; 997 testChain!`pull,pullPeek,peekPush,pushAlloc,alloc`; 998 testChain!`peek,peekPull,pullPush,pushAlloc,alloc`; 999 testChain!`peek,peekPull,pullAlloc,allocPush,push`; 1000 } 1001 1002 unittest { 1003 // convert active source to passive source at stage 2, longer passive source chain 1004 testChain!`push,pushPull,pull,pull,pullPeek,peek,peekPush,push,push`; 1005 } 1006 1007 unittest { 1008 // convert active source to passive source at stage >2 (longer active source chain) 1009 testChain!`push,push,pushPull,pull`; 1010 testChain!`push,push,push,push,push,pushPull,pull`; 1011 testChain!`push,push,pushAlloc,alloc,alloc,allocPeek,peek`; 1012 } 1013 1014 unittest { 1015 // multiple inverters 1016 testChain!`alloc,allocPeek,peekPush,pushPull,pull`; 1017 testChain!`alloc,alloc,alloc,allocPeek,peek,peekPush,push,pushPull,pull`; 1018 testChain!`alloc,alloc,allocPeek,peekPush,pushPull,pull`; 1019 testChain!`alloc,alloc,alloc,allocPeek,peekPush,pushPull,pullPush,push,pushAlloc,alloc,allocPush,pushPeek,peekAlloc,allocPull,pull`; 1020 } 1021 1022 unittest { 1023 // implicit adapters, pull->push 1024 testChain!`pull,push`; 1025 testChain!`pull,push,push`; 1026 testChain!`pull,pushPeek,peek`; 1027 testChain!`pull,pushPull,pull`; 1028 testChain!`pull,pushAlloc,alloc`; 1029 } 1030 1031 unittest { 1032 // implicit adapters, pull->peek 1033 testChain!`pull,peek`; 1034 testChain!`pull,peekPush,push`; 1035 testChain!`pull,peek,peek`; 1036 testChain!`pull,peekPull,pull`; 1037 testChain!`pull,peekAlloc,alloc`; 1038 } 1039 1040 unittest { 1041 // implicit adapters, pull->alloc 1042 testChain!`pull,alloc`; 1043 testChain!`pull,allocPush,push`; 1044 testChain!`pull,allocPeek,peek`; 1045 testChain!`pull,allocPull,pull`; 1046 testChain!`pull,alloc,alloc`; 1047 } 1048 1049 unittest { 1050 // implicit adapters, push->pull 1051 testChain!`push,pull`; 1052 testChain!`push,pullPush,push`; 1053 testChain!`push,pullAlloc,alloc`; 1054 testChain!`push,pullPeek,peek`; 1055 testChain!`push,pull,pull`; 1056 } 1057 1058 unittest { 1059 // implicit adapters, push->peek 1060 testChain!`push,peek`; 1061 testChain!`push,peekPush,push`; 1062 testChain!`push,peekAlloc,alloc`; 1063 testChain!`push,peek,peek`; 1064 testChain!`push,peekPull,pull`; 1065 } 1066 1067 unittest { 1068 // implicit adapters, push->alloc 1069 testChain!`push,alloc`; 1070 testChain!`push,allocPush,push`; 1071 testChain!`push,allocPeek,peek`; 1072 testChain!`push,allocPull,pull`; 1073 testChain!`push,alloc,alloc`; 1074 } 1075 1076 unittest { 1077 // implicit adapters, peek->pull 1078 testChain!`peek,pull`; 1079 testChain!`peek,pullPush,push`; 1080 testChain!`peek,pullAlloc,alloc`; 1081 testChain!`peek,pullPeek,peek`; 1082 testChain!`peek,pull,pull`; 1083 } 1084 1085 unittest { 1086 // implicit adapters, peek->push 1087 testChain!`peek,push`; 1088 testChain!`peek,push,push`; 1089 testChain!`peek,pushAlloc,alloc`; 1090 testChain!`peek,pushPeek,peek`; 1091 testChain!`peek,pushPull,pull`; 1092 } 1093 1094 unittest { 1095 // implicit adapters, peek->alloc 1096 testChain!`peek,alloc`; 1097 testChain!`peek,allocPush,push`; 1098 testChain!`peek,allocPeek,peek`; 1099 testChain!`peek,allocPull,pull`; 1100 testChain!`peek,alloc,alloc`; 1101 } 1102 1103 unittest { 1104 // implicit adapters, alloc->peek 1105 testChain!`alloc,peek`; 1106 testChain!`alloc,peekPush,push`; 1107 testChain!`alloc,peekAlloc,alloc`; 1108 testChain!`alloc,peek,peek`; 1109 testChain!`alloc,peekPull,pull`; 1110 } 1111 1112 unittest { 1113 // implicit adapters, alloc->pull 1114 testChain!`alloc,pull`; 1115 testChain!`alloc,pullPush,push`; 1116 testChain!`alloc,pullAlloc,alloc`; 1117 testChain!`alloc,pullPeek,peek`; 1118 testChain!`alloc,pull,pull`; 1119 } 1120 1121 unittest { 1122 // implicit adapters, alloc->push 1123 testChain!`alloc,push`; 1124 testChain!`alloc,push,push`; 1125 testChain!`alloc,pushAlloc,alloc`; 1126 testChain!`alloc,pushPeek,peek`; 1127 testChain!`alloc,pushPull,pull`; 1128 } 1129 1130 unittest { 1131 // implicit adapters, all in one pipeline 1132 testChain!`alloc,push,peek,pull,alloc,peek,push,pull,peek,alloc,pull,push,peek`; 1133 }