1 /** Pipeline composition. 2 * 3 * Authors: $(LINK2 https://github.com/epi, Adrian Matoga) 4 * Copyright: © 2016 Adrian Matoga 5 * License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0). 6 */ 7 module flod.pipeline; 8 9 import std.range : isDynamicArray, isInputRange; 10 import std.typecons : Flag, Yes, No; 11 12 import flod.meta : NonCopyable, str; 13 import flod.range; 14 import flod.traits; 15 16 version(unittest) { 17 import std.algorithm : min, max, map, copy; 18 import std.conv : to; 19 import std.experimental.logger : logf, errorf; 20 import std.range : isInputRange, ElementType, array, take; 21 import std.string : split, toLower, startsWith, endsWith; 22 23 ulong[] inputArray; 24 ulong[] outputArray; 25 size_t outputIndex; 26 27 uint filterMark(string f) { 28 f = f.toLower; 29 uint fm; 30 if (f.startsWith("pull")) 31 fm = 1; 32 else if (f.startsWith("push")) 33 fm = 2; 34 else if (f.startsWith("alloc")) 35 fm = 3; 36 if (f.endsWith("pull")) 37 fm |= 1 << 2; 38 else if (f.endsWith("push")) 39 fm |= 2 << 2; 40 else if (f.endsWith("alloc")) 41 fm |= 3 << 2; 42 return fm; 43 } 44 45 ulong filter(string f)(ulong a) { 46 enum fm = filterMark(f); 47 return (a << 4) | fm; 48 } 49 50 // sources: 51 struct Arg(alias T) { bool constructed = false; } 52 53 mixin template TestStage(N...) { 54 alias This = typeof(this); 55 static if (is(This == A!(B, C), alias A, B, C)) 56 alias Stage = A; 57 else static if (is(This == D!(E), alias D, E)) 58 alias Stage = D; 59 else static if (is(This == F!G, alias F, alias G)) 60 alias Stage = F; 61 else static if (is(This)) 62 alias Stage = This; 63 else 64 static assert(0, "don't know how to get stage from " ~ This.stringof ~ " (" ~ str!This ~ ")"); 65 66 @disable this(this); 67 @disable void opAssign(typeof(this)); 68 69 // this is to ensure that construct() calls the right constructor for each stage 70 this(Arg!Stage arg) { this.arg = arg; this.arg.constructed = true; } 71 Arg!Stage arg; 72 } 73 74 @pullSource!ulong 75 struct TestPullSource { 76 mixin TestStage; 77 78 size_t pull(ulong[] buf) 79 { 80 auto len = min(buf.length, inputArray.length); 81 buf[0 .. len] = inputArray[0 .. len]; 82 inputArray = inputArray[len .. $]; 83 return len; 84 } 85 } 86 87 @peekSource!ulong 88 struct TestPeekSource { 89 mixin TestStage; 90 91 const(ulong)[] peek(size_t n) 92 { 93 auto len = min(max(n, 2909), inputArray.length); 94 return inputArray[0 .. len]; 95 } 96 97 void consume(size_t n) { inputArray = inputArray[n .. $]; } 98 } 99 100 @pushSource!ulong 101 struct TestPushSource(Sink) { 102 mixin TestStage; 103 Sink sink; 104 105 void run()() 106 { 107 while (inputArray.length) { 108 auto len = min(1337, inputArray.length); 109 if (sink.push(inputArray[0 .. len]) != len) 110 break; 111 inputArray = inputArray[len .. $]; 112 } 113 } 114 } 115 116 @allocSource!ulong 117 struct TestAllocSource(Sink) { 118 mixin TestStage; 119 Sink sink; 120 121 void run()() 122 { 123 ulong[] buf; 124 while (inputArray.length) { 125 auto len = min(1337, inputArray.length); 126 if (!sink.alloc(buf, len)) 127 assert(0); 128 buf[0 .. len] = inputArray[0 .. len]; 129 if (sink.commit(len) != len) 130 break; 131 inputArray = inputArray[len .. $]; 132 } 133 } 134 } 135 136 // sinks: 137 138 @pullSink!ulong 139 struct TestPullSink(Source) { 140 mixin TestStage; 141 Source source; 142 143 void run() 144 { 145 while (outputIndex < outputArray.length) { 146 auto len = min(4157, outputArray.length - outputIndex); 147 auto pd = source.pull(outputArray[outputIndex .. outputIndex + len]); 148 outputIndex += pd; 149 if (pd < len) 150 break; 151 } 152 } 153 } 154 155 @peekSink!ulong 156 struct TestPeekSink(Source) { 157 mixin TestStage; 158 Source source; 159 160 void run() 161 { 162 while (outputIndex < outputArray.length) { 163 auto len = min(4157, outputArray.length - outputIndex); 164 auto ib = source.peek(len); 165 auto olen = min(len, ib.length, 6379); 166 outputArray[outputIndex .. outputIndex + olen] = ib[0 .. olen]; 167 outputIndex += olen; 168 source.consume(olen); 169 if (olen < len) 170 break; 171 } 172 } 173 } 174 175 @pushSink!ulong 176 struct TestPushSink { 177 mixin TestStage; 178 179 size_t push(const(ulong)[] buf) 180 { 181 auto len = min(buf.length, outputArray.length - outputIndex); 182 if (len) { 183 outputArray[outputIndex .. outputIndex + len] = buf[0 .. len]; 184 outputIndex += len; 185 } 186 return len; 187 } 188 } 189 190 @allocSink!ulong 191 struct TestAllocSink { 192 mixin TestStage; 193 ulong[] last; 194 195 bool alloc(ref ulong[] buf, size_t n) 196 { 197 if (n < outputArray.length - outputIndex) 198 buf = outputArray[outputIndex .. outputIndex + n]; 199 else 200 buf = last = new ulong[n]; 201 return true; 202 } 203 204 size_t commit(size_t n) 205 out(result) { assert(result <= n); } 206 body 207 { 208 if (!last) { 209 outputIndex += n; 210 return n; 211 } else { 212 auto len = min(n, outputArray.length - outputIndex); 213 outputArray[outputIndex .. outputIndex + len] = last[0 .. len]; 214 outputIndex += len; 215 return len; 216 } 217 } 218 } 219 220 // filter 221 222 @peekSink!ulong @peekSource!ulong 223 struct TestPeekFilter(Source) { 224 mixin TestStage; 225 Source source; 226 const(ulong)[] peek(size_t n) 227 { 228 return source.peek(n).map!(filter!"peek").array(); 229 } 230 void consume(size_t n) { source.consume(n); } 231 } 232 233 @peekSink!ulong @pullSource!ulong 234 struct TestPeekPullFilter(Source) { 235 mixin TestStage; 236 Source source; 237 size_t pull(ulong[] buf) 238 { 239 auto ib = source.peek(buf.length); 240 auto len = min(ib.length, buf.length); 241 ib.take(len).map!(filter!"peekPull").copy(buf); 242 source.consume(len); 243 return len; 244 } 245 } 246 247 @peekSink!ulong @pushSource!ulong 248 struct TestPeekPushFilter(Source, Sink) { 249 mixin TestStage; 250 Source source; 251 Sink sink; 252 void run()() 253 { 254 for (;;) { 255 auto ib = source.peek(4096); 256 auto ob = ib.map!(filter!"peekPush").array(); 257 source.consume(ib.length); 258 if (sink.push(ob) < 4096) 259 break; 260 } 261 } 262 } 263 264 @peekSink!ulong @allocSource!ulong 265 struct TestPeekAllocFilter(Source, Sink) { 266 mixin TestStage; 267 Source source; 268 Sink sink; 269 void run()() 270 { 271 ulong[] buf; 272 for (;;) { 273 auto ib = source.peek(4096); 274 if (!sink.alloc(buf, ib.length)) 275 assert(0); 276 auto len = min(ib.length, buf.length); 277 ib.take(len).map!(filter!"peekAlloc").copy(buf); 278 source.consume(len); 279 if (sink.commit(len) < 4096) 280 break; 281 } 282 } 283 } 284 285 @pullSink!ulong @pullSource!ulong 286 struct TestPullFilter(Source) { 287 mixin TestStage; 288 Source source; 289 size_t pull(ulong[] buf) 290 { 291 size_t n = source.pull(buf); 292 foreach (ref b; buf[0 .. n]) 293 b = b.filter!"pull"; 294 return n; 295 } 296 } 297 298 @pullSink!ulong @peekSource!ulong 299 struct TestPullPeekFilter(Source) { 300 mixin TestStage; 301 Source source; 302 const(ulong)[] peek(size_t n) 303 { 304 auto buf = new ulong[n]; 305 size_t m = source.pull(buf[]); 306 foreach (ref b; buf[0 .. m]) 307 b = b.filter!"pullPeek"; 308 return buf[0 .. m]; 309 } 310 void consume(size_t n) {} 311 } 312 313 @pullSink!ulong @pushSource!ulong 314 struct TestPullPushFilter(Source, Sink) { 315 mixin TestStage; 316 Source source; 317 Sink sink; 318 void run()() 319 { 320 for (;;) { 321 ulong[4096] buf; 322 auto n = source.pull(buf[]); 323 foreach (ref b; buf[0 .. n]) 324 b = b.filter!"pullPush"; 325 if (sink.push(buf[0 .. n]) < 4096) 326 break; 327 } 328 } 329 } 330 331 @pullSink!ulong @allocSource!ulong 332 struct TestPullAllocFilter(Source, Sink) { 333 mixin TestStage; 334 Source source; 335 Sink sink; 336 void run()() 337 { 338 for (;;) { 339 ulong[] buf; 340 if (!sink.alloc(buf, 4096)) 341 assert(0); 342 auto n = source.pull(buf[]); 343 foreach (ref b; buf[0 .. n]) 344 b = b.filter!"pullAlloc"; 345 if (sink.commit(n) < 4096) 346 break; 347 } 348 } 349 } 350 351 @pushSink!ulong @pushSource!ulong 352 struct TestPushFilter(Sink) { 353 mixin TestStage; 354 Sink sink; 355 size_t push(const(ulong)[] buf) 356 { 357 return sink.push(buf.map!(filter!"push").array()); 358 } 359 } 360 361 @pushSink!ulong @allocSource!ulong 362 struct TestPushAllocFilter(Sink) { 363 mixin TestStage; 364 Sink sink; 365 size_t push(const(ulong)[] buf) 366 out(result) { assert(result <= buf.length); } 367 body 368 { 369 ulong[] ob; 370 if (!sink.alloc(ob, buf.length)) 371 assert(0); 372 auto len = min(buf.length, ob.length); 373 buf.take(len).map!(filter!"pushAlloc").copy(ob); 374 return sink.commit(len); 375 } 376 } 377 378 @pushSink!ulong @pullSource!ulong 379 struct TestPushPullFilter(alias Scheduler) { 380 mixin Scheduler; 381 mixin TestStage; 382 ulong[] buffer; 383 384 size_t push(const(ulong)[] buf) 385 { 386 buffer ~= buf.map!(filter!"pushPull").array(); 387 if (yield()) 388 return 0; 389 return buf.length; 390 } 391 392 size_t pull(ulong[] buf) 393 { 394 size_t n = buf.length; 395 while (buffer.length < n) { 396 if (yield()) 397 break; 398 } 399 size_t len = min(n, buffer.length); 400 buf[0 .. len] = buffer[0 .. len]; 401 buffer = buffer[len .. $]; 402 return len; 403 } 404 } 405 406 @pushSink!ulong @peekSource!ulong 407 struct TestPushPeekFilter(alias Scheduler) { 408 mixin Scheduler; 409 mixin TestStage; 410 ulong[] buffer; 411 412 size_t push(const(ulong)[] buf) 413 { 414 buffer ~= buf.map!(filter!"pushPeek").array(); 415 if (yield()) 416 return 0; 417 return buf.length; 418 } 419 420 const(ulong)[] peek(size_t n) 421 { 422 while (buffer.length < n) { 423 if (yield()) 424 break; 425 } 426 return buffer; 427 } 428 429 void consume(size_t n) 430 { 431 buffer = buffer[n .. $]; 432 } 433 } 434 435 @allocSink!ulong @allocSource!ulong 436 struct TestAllocFilter(Sink) { 437 mixin TestStage; 438 Sink sink; 439 ulong[] buf; 440 441 bool alloc(ref ulong[] buf, size_t n) 442 { 443 auto r = sink.alloc(buf, n); 444 this.buf = buf; 445 return r; 446 } 447 448 size_t commit(size_t n) 449 { 450 foreach (ref b; buf[0 .. n]) 451 b = b.filter!"alloc"; 452 return sink.commit(n); 453 } 454 } 455 456 @allocSink!ulong @pushSource!ulong 457 struct TestAllocPushFilter(Sink) { 458 mixin TestStage; 459 Sink sink; 460 ulong[] buffer; 461 462 bool alloc(ref ulong[] buf, size_t n) 463 { 464 buffer = buf = new ulong[n]; 465 return true; 466 } 467 468 size_t commit(size_t n) 469 { 470 size_t m = sink.push(buffer[0 .. n].map!(filter!"allocPush").array()); 471 buffer = buffer[m .. $]; 472 return m; 473 } 474 } 475 476 @allocSink!ulong @pullSource!ulong 477 struct TestAllocPullFilter(alias Scheduler) { 478 mixin Scheduler; 479 mixin TestStage; 480 ulong[] buffer; 481 size_t readOffset; 482 size_t writeOffset; 483 484 bool alloc(ref ulong[] buf, size_t n) 485 { 486 buffer.length = writeOffset + n; 487 buf = buffer[writeOffset .. $]; 488 return true; 489 } 490 491 size_t commit(size_t n) 492 { 493 foreach (ref b; buffer[writeOffset .. writeOffset + n]) 494 b = b.filter!"allocPull"; 495 writeOffset += n; 496 if (yield()) 497 return 0; 498 return n; 499 } 500 501 size_t pull(ulong[] buf) 502 { 503 size_t n = buf.length; 504 while (writeOffset - readOffset < n) { 505 if (yield()) 506 break; 507 } 508 size_t len = min(n, writeOffset - readOffset); 509 buf[0 .. len] = buffer[readOffset .. readOffset + len]; 510 readOffset += len; 511 return len; 512 } 513 } 514 515 @allocSink!ulong @peekSource!ulong 516 struct TestAllocPeekFilter(alias Scheduler) { 517 mixin Scheduler; 518 mixin TestStage; 519 ulong[] buffer; 520 size_t readOffset; 521 size_t writeOffset; 522 523 bool alloc(ref ulong[] buf, size_t n) 524 { 525 buffer.length = writeOffset + n; 526 buf = buffer[writeOffset .. $]; 527 return true; 528 } 529 530 size_t commit(size_t n) 531 { 532 foreach (ref b; buffer[writeOffset .. writeOffset + n]) 533 b = b.filter!"allocPeek"; 534 writeOffset += n; 535 if (yield()) 536 return 0; 537 return n; 538 } 539 540 const(ulong)[] peek(size_t n) 541 { 542 while (writeOffset - readOffset < n) { 543 if (yield()) 544 break; 545 } 546 return buffer[readOffset .. writeOffset]; 547 } 548 549 void consume(size_t n) 550 { 551 readOffset += n; 552 } 553 } 554 555 string genStage(string filter, string suf) 556 { 557 import std.ascii : toUpper; 558 auto cf = filter[0].toUpper ~ filter[1 .. $]; 559 return "pipe!Test" ~ cf ~ suf ~ "(Arg!Test" ~ cf ~ suf ~ "())"; 560 } 561 562 string genChain(string filterList) 563 { 564 import std.algorithm : map; 565 import std.array : join, split; 566 auto filters = filterList.split(","); 567 string midstr; 568 if (filters.length > 2) 569 midstr = filters[1 .. $ - 1].map!(f => "." ~ genStage(f, "Filter")).join; 570 return genStage(filters[0], "Source") 571 ~ midstr 572 ~ "." ~ genStage(filters[$ - 1], "Sink") ~ ";"; 573 } 574 575 void testChain(string filterlist, R)(R r) 576 if (isInputRange!R && is(ElementType!R : ulong)) 577 { 578 auto input = r.map!(a => ulong(a)).array(); 579 logf("Testing %s with %d elements", filterlist, input.length); 580 auto expectedOutput = input.dup; 581 auto filters = filterlist.split(","); 582 if (filters.length > 2) { 583 foreach (filter; filters[1 .. $ - 1]) { 584 auto fm = filterMark(filter); 585 foreach (ref eo; expectedOutput) 586 eo = (eo << 4) | fm; 587 } 588 } 589 foreach(expectedLength; [ size_t(0), input.length / 3, input.length - 1, input.length, 590 input.length + 1, input.length * 5 ]) { 591 outputArray.length = expectedLength; 592 outputArray[] = 0xbadc0ffee0ddf00d; 593 inputArray = input; 594 outputIndex = 0; 595 mixin(genChain(filterlist)); 596 auto len = min(outputIndex, expectedLength, input.length); 597 uint left = 8; 598 size_t all = 0; 599 if (outputIndex != min(expectedLength, input.length)) { 600 errorf("Output length is %d, expected %d", outputIndex, min(expectedLength, input.length)); 601 assert(0); 602 } 603 for (size_t i = 0; i < len; i++) { 604 if (expectedOutput[i] != outputArray[i]) { 605 if (left > 0) { 606 logf("expected[%d] != output[%d]: %x vs. %x", i, i, expectedOutput[i], outputArray[i]); 607 --left; 608 } 609 all++; 610 } 611 } 612 if (all > 0) { 613 logf("%s", genChain(filterlist)); 614 logf("total: %d differences", all); 615 } 616 assert(all == 0); 617 } 618 } 619 620 void testChain(string filterlist)() 621 { 622 import std.range : iota; 623 testChain!filterlist(iota(0, 173447)); 624 } 625 626 } 627 628 struct SinkDrivenFiberScheduler { 629 import core.thread : Fiber; 630 Fiber fiber; 631 mixin NonCopyable; 632 633 void stop() 634 { 635 auto f = this.fiber; 636 if (f) { 637 if (f.state == Fiber.State.HOLD) { 638 this.fiber = null; 639 f.call(); 640 } 641 auto x = f.state; 642 assert(f.state == Fiber.State.TERM); 643 } 644 } 645 646 int yield() 647 { 648 if (fiber is null) 649 return 2; 650 if (fiber.state == Fiber.State.EXEC) { 651 Fiber.yield(); 652 return fiber is null; 653 } else { 654 if (fiber.state == Fiber.State.HOLD) 655 fiber.call(); 656 return fiber.state != Fiber.State.HOLD; 657 } 658 } 659 } 660 661 mixin template FiberScheduler() { 662 import flod.pipeline; 663 SinkDrivenFiberScheduler _flod_scheduler; 664 665 int yield() { return _flod_scheduler.yield(); } 666 void spawn(void delegate() dg) 667 { 668 import core.thread : Fiber; 669 if (!_flod_scheduler.fiber) 670 _flod_scheduler.fiber = new Fiber(dg, 65536); 671 } 672 void stop() { _flod_scheduler.stop(); } 673 } 674 675 // forwards all calls to its impl 676 private struct Forward(S, Flag!"readFromSink" readFromSink = No.readFromSink) { 677 enum name = .str!S; 678 S _impl; 679 this(Args...)(auto ref Args args) 680 { 681 static if (Args.length > 0) 682 _impl.__ctor(args); 683 } 684 ~this() 685 { 686 debug(FlodTraceLifetime) { 687 import std.experimental.logger : tracef; 688 tracef("Destroy %s", name); 689 } 690 } 691 @property ref auto sink()() { return _impl.sink; } 692 @property ref auto source()() { return _impl.source; } 693 static if (readFromSink) { 694 // active source needs to pass pull calls to any push-pull filter at the end of 695 // the active source chain, so that an inverter wrapping the whole chain can recover 696 // data sunk into the filter. 697 auto peek()(size_t n) { return _impl.sink.peek(n); } 698 void consume()(size_t n) { _impl.sink.consume(n); } 699 auto pull(T)(T[] buf) { return _impl.sink.pull(buf); } 700 void spawn()(void delegate() dg) { return _impl.sink.spawn(dg); } 701 void stop()() { return _impl.sink.stop(); } 702 } else { 703 auto peek()(size_t n) { return _impl.peek(n); } 704 void consume()(size_t n) { _impl.consume(n); } 705 auto pull(T)(T[] buf) { return _impl.pull(buf); } 706 void spawn()(void delegate() dg) { return _impl.spawn(dg); } 707 void stop()() { return _impl.stop(); } 708 } 709 auto run()() { return _impl.run(); } 710 auto step(A...)(auto ref A a) { return _impl.step(a); } 711 auto alloc(T)(ref T[] buf, size_t n) { return _impl.alloc(buf, n); } 712 auto commit()(size_t n) { return _impl.commit(n); } 713 auto push(T)(const(T)[] buf) { return _impl.push(buf); } 714 } 715 716 private template Inverter(alias method, T) { 717 @method 718 struct Inverter(Source) { 719 import core.thread : Fiber; 720 721 Source source; 722 723 ~this() 724 { 725 source.sink.stop(); 726 } 727 728 void run() 729 { 730 source.run(); 731 } 732 733 size_t pull()(T[] buf) 734 { 735 source.sink.spawn(&run); 736 return source.sink.pull(buf); 737 } 738 739 const(T)[] peek()(size_t n) 740 { 741 source.sink.spawn(&run); 742 return source.sink.peek(n); 743 } 744 745 void consume()(size_t n) { source.sink.consume(n); } 746 } 747 } 748 749 private void constructInPlace(T, Args...)(ref T t, auto ref Args args) 750 { 751 debug(FlodTraceLifetime) { 752 import std.experimental.logger : tracef; 753 tracef("Construct at %x..%x %s with %s", &t, &t + 1, T.name, Args.stringof); 754 } 755 static if (__traits(hasMember, t, "__ctor")) { 756 t.__ctor(args); 757 } else static if (Args.length > 0) { 758 static assert(0, "Stage " ~ str!T ~ " does not have a non-trivial constructor" ~ 759 " but construction was requested with arguments " ~ Args.stringof); 760 } 761 } 762 763 private struct Pipeline(alias S, SoP, SiP, A...) { 764 alias Stage = S; 765 alias Args = A; 766 alias SourcePipeline = SoP; 767 alias SinkPipeline = SiP; 768 769 enum bool hasSource = !is(SourcePipeline == typeof(null)); 770 enum bool hasSink = !is(SinkPipeline == typeof(null)); 771 772 static if (hasSource) { 773 alias FirstStage = SourcePipeline.FirstStage; 774 enum sourcePipeStr = SourcePipeline.pipeStr ~ "->"; 775 enum sourceTreeStr(int indent) = SourcePipeline.treeStr!(indent + 1) ~ "\n"; 776 enum sourceStr = SourcePipeline.str ~ "."; 777 } else { 778 alias FirstStage = Stage; 779 enum sourcePipeStr = ""; 780 enum sourceTreeStr(int indent) = ""; 781 enum sourceStr = ""; 782 } 783 784 static if (hasSink) { 785 alias LastStage = SinkPipeline.LastStage; 786 enum sinkPipeStr = "->" ~ SinkPipeline.pipeStr; 787 enum sinkTreeStr(int indent) = "\n" ~ SinkPipeline.treeStr!(indent + 1); 788 enum sinkStr = "." ~ SinkPipeline.str; 789 } else { 790 alias LastStage = Stage; 791 enum sinkPipeStr = ""; 792 enum sinkTreeStr(int indent) = ""; 793 enum sinkStr = ""; 794 } 795 796 static if (is(Traits!LastStage.SourceElementType W)) 797 alias ElementType = W; 798 799 enum pipeStr = sourcePipeStr ~ .str!S ~ sinkPipeStr; 800 enum treeStr(int indent) = sourceTreeStr!indent 801 ~ "|" ~ repeat!(indent, "-") ~ "-" ~ .str!S 802 ~ sinkTreeStr!indent; 803 enum str = "(" ~ sourceStr ~ .str!Stage ~ sinkStr ~ ")"; 804 805 static if (hasSink && is(SinkPipeline.Type T)) 806 alias SinkType = T; 807 static if (hasSource && is(SourcePipeline.Type U)) 808 alias SourceType = U; 809 810 static if (isActiveSource!Stage && isActiveSink!Stage && is(SinkType) && is(SourceType)) 811 alias Type = Forward!(Stage!(SourceType, SinkType)); 812 else static if (isActiveSource!Stage && !isActiveSink!Stage && is(SinkType)) 813 alias Type = Forward!(Stage!SinkType, Yes.readFromSink); 814 else static if (!isActiveSource!Stage && isActiveSink!Stage && is(SourceType)) 815 alias Type = Forward!(Stage!SourceType); 816 else static if (isPassiveSink!Stage && isPassiveSource!Stage && is(Stage!FiberScheduler SF)) 817 alias Type = Forward!SF; 818 else static if (is(Stage)) 819 alias Type = Forward!Stage; 820 else static if (isPassiveSource!Stage && !isSink!Stage && is(SourceType)) // inverter 821 alias Type = Forward!(Stage!SourceType); 822 823 SourcePipeline sourcePipeline; 824 SinkPipeline sinkPipeline; 825 Args args; 826 827 auto pipe(alias NextStage, NextArgs...)(auto ref NextArgs nextArgs) 828 { 829 alias SourceE = Traits!LastStage.SourceElementType; 830 alias SinkE = Traits!NextStage.SinkElementType; 831 static assert(is(SourceE == SinkE), "Incompatible element types: " ~ 832 .str!LastStage ~ " produces " ~ SourceE.stringof ~ ", while " ~ 833 .str!NextStage ~ " expects " ~ SinkE.stringof); 834 835 static if (areCompatible!(LastStage, NextStage)) { 836 static if (isPassiveSource!Stage) { 837 auto result = pipeline!NextStage(this, null, nextArgs); 838 } else { 839 static assert(isActiveSource!Stage); 840 static if (isPassiveSink!NextStage && isPassiveSource!NextStage) { 841 static if (isPassiveSink!Stage) { 842 static assert(!hasSource); 843 static if (hasSink) { 844 auto result = pipeline!Stage(null, sinkPipeline.pipe!NextStage(nextArgs), args); 845 } else { 846 auto result = pipeline!Stage(null, pipeline!NextStage(null, null, nextArgs), args); 847 } 848 } else { 849 static if (hasSink) { 850 auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))( 851 pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args), 852 null); 853 } else { 854 auto result = pipeline!(Inverter!(sourceMethod!NextStage, SourceE))( 855 pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args), 856 null); 857 } 858 } 859 } else { 860 static if (hasSink) 861 auto result = pipeline!Stage(sourcePipeline, sinkPipeline.pipe!NextStage(nextArgs), args); 862 else 863 auto result = pipeline!Stage(sourcePipeline, pipeline!NextStage(null, null, nextArgs), args); 864 } 865 } 866 static if (isSource!NextStage || isSink!FirstStage) 867 return result; 868 else 869 result.run(); 870 } else { 871 import std.string : capitalize; 872 import flod.adapter; 873 enum adapterName = Traits!LastStage.sourceMethodStr ~ Traits!NextStage.sinkMethodStr.capitalize(); 874 mixin(`return this.` ~ adapterName ~ `.pipe!NextStage(nextArgs);`); 875 } 876 } 877 878 static if (is(Type)) { 879 void construct()(ref Type t) 880 { 881 static if (hasSource) 882 sourcePipeline.construct(t.source); 883 static if (hasSink) 884 sinkPipeline.construct(t.sink); 885 constructInPlace(t, args); 886 } 887 } 888 889 static if (!isSink!FirstStage && !isSource!LastStage) { 890 void run()() 891 { 892 Type t; 893 construct(t); 894 t.run(); 895 } 896 } 897 898 static if (!isSink!FirstStage && !isActiveSource!LastStage) { 899 auto create()() 900 { 901 Type t; 902 construct(t); 903 return t; 904 } 905 } 906 } 907 908 private auto pipeline(alias Stage, SoP, SiP, A...)(auto ref SoP sourcePipeline, auto ref SiP sinkPipeline, auto ref A args) 909 { 910 return Pipeline!(Stage, SoP, SiP, A)(sourcePipeline, sinkPipeline, args); 911 } 912 913 private template testPipeline(P, alias test) { 914 static if (is(P == Pipeline!A, A...)) 915 enum testPipeline = test!(P.LastStage); 916 else 917 enum testPipeline = false; 918 } 919 920 enum isPeekPipeline(P) = isDynamicArray!P || testPipeline!(P, isPeekSource); 921 922 enum isPullPipeline(P) = isInputRange!P || testPipeline!(P, isPullSource); 923 924 enum isPushPipeline(P) = testPipeline!(P, isPushSource); 925 926 enum isAllocPipeline(P) = testPipeline!(P, isAllocSource); 927 928 enum isPipeline(P) = isPushPipeline!P || isPullPipeline!P || isPeekPipeline!P || isAllocPipeline!P; 929 930 /// 931 auto pipe(alias Stage, Args...)(auto ref Args args) 932 if (isSink!Stage || isSource!Stage) 933 { 934 static if (isSink!Stage && Args.length > 0 && isDynamicArray!(Args[0])) 935 return pipeFromArray(args[0]).pipe!Stage(args[1 .. $]); 936 else static if (isSink!Stage && Args.length > 0 && isInputRange!(Args[0])) 937 return pipeFromInputRange(args[0]).pipe!Stage(args[1 .. $]); 938 else 939 return pipeline!Stage(null, null, args); 940 } 941 942 unittest { 943 auto p1 = pipe!TestPeekSource(Arg!TestPeekSource()); 944 static assert(isPeekPipeline!(typeof(p1))); 945 static assert(is(p1.ElementType == ulong)); 946 auto p2 = pipe!TestPullSource(Arg!TestPullSource()); 947 static assert(isPullPipeline!(typeof(p2))); 948 static assert(is(p2.ElementType == ulong)); 949 } 950 951 unittest { 952 auto p1 = pipe!TestPushSource(Arg!TestPushSource()); 953 static assert(isPushPipeline!(typeof(p1))); 954 auto p2 = pipe!TestAllocSource(Arg!TestAllocSource()); 955 static assert(isAllocPipeline!(typeof(p2))); 956 } 957 958 unittest { 959 // compatible source-sink pairs 960 testChain!`peek,peek`; 961 testChain!`pull,pull`; 962 testChain!`push,push`; 963 testChain!`alloc,alloc`; 964 } 965 966 unittest { 967 // compatible, with 1 filter 968 testChain!`peek,peek,peek`; 969 testChain!`peek,peekPull,pull`; 970 testChain!`peek,peekPush,push`; 971 testChain!`peek,peekAlloc,alloc`; 972 testChain!`pull,pullPeek,peek`; 973 testChain!`pull,pull,pull`; 974 testChain!`pull,pullPush,push`; 975 testChain!`pull,pullAlloc,alloc`; 976 testChain!`push,pushPeek,peek`; 977 testChain!`push,pushPull,pull`; 978 testChain!`push,push,push`; 979 testChain!`push,pushAlloc,alloc`; 980 testChain!`alloc,allocPeek,peek`; 981 testChain!`alloc,allocPull,pull`; 982 testChain!`alloc,allocPush,push`; 983 testChain!`alloc,alloc,alloc`; 984 } 985 986 unittest { 987 // just one active sink at the end 988 testChain!`peek,peek,peek,peek,peek`; 989 testChain!`peek,peek,peekPull,pull,pull`; 990 testChain!`pull,pull,pull,pull,pull`; 991 testChain!`pull,pull,pullPeek,peek,peek`; 992 } 993 994 unittest { 995 // just one active source at the beginning 996 testChain!`push,push,push,push,push`; 997 testChain!`push,push,pushAlloc,alloc,alloc`; 998 testChain!`alloc,alloc,alloc,alloc,alloc`; 999 testChain!`alloc,alloc,allocPush,push,push`; 1000 } 1001 1002 unittest { 1003 // convert passive source to active source, longer chains 1004 testChain!`pull,pullPeek,peekAlloc,allocPush,push`; 1005 testChain!`pull,pullPeek,peekPush,pushAlloc,alloc`; 1006 testChain!`peek,peekPull,pullPush,pushAlloc,alloc`; 1007 testChain!`peek,peekPull,pullAlloc,allocPush,push`; 1008 } 1009 1010 unittest { 1011 // convert active source to passive source at stage 2, longer passive source chain 1012 testChain!`push,pushPull,pull,pull,pullPeek,peek,peekPush,push,push`; 1013 } 1014 1015 unittest { 1016 // convert active source to passive source at stage >2 (longer active source chain) 1017 testChain!`push,push,pushPull,pull`; 1018 testChain!`push,push,push,push,push,pushPull,pull`; 1019 testChain!`push,push,pushAlloc,alloc,alloc,allocPeek,peek`; 1020 } 1021 1022 unittest { 1023 // multiple inverters 1024 testChain!`alloc,allocPeek,peekPush,pushPull,pull`; 1025 testChain!`alloc,alloc,alloc,allocPeek,peek,peekPush,push,pushPull,pull`; 1026 testChain!`alloc,alloc,allocPeek,peekPush,pushPull,pull`; 1027 testChain!`alloc,alloc,alloc,allocPeek,peekPush,pushPull,pullPush,push,pushAlloc,alloc,allocPush,pushPeek,peekAlloc,allocPull,pull`; 1028 } 1029 1030 unittest { 1031 // implicit adapters, pull->push 1032 testChain!`pull,push`; 1033 testChain!`pull,push,push`; 1034 testChain!`pull,pushPeek,peek`; 1035 testChain!`pull,pushPull,pull`; 1036 testChain!`pull,pushAlloc,alloc`; 1037 } 1038 1039 unittest { 1040 // implicit adapters, pull->peek 1041 testChain!`pull,peek`; 1042 testChain!`pull,peekPush,push`; 1043 testChain!`pull,peek,peek`; 1044 testChain!`pull,peekPull,pull`; 1045 testChain!`pull,peekAlloc,alloc`; 1046 } 1047 1048 unittest { 1049 // implicit adapters, pull->alloc 1050 testChain!`pull,alloc`; 1051 testChain!`pull,allocPush,push`; 1052 testChain!`pull,allocPeek,peek`; 1053 testChain!`pull,allocPull,pull`; 1054 testChain!`pull,alloc,alloc`; 1055 } 1056 1057 unittest { 1058 // implicit adapters, push->pull 1059 testChain!`push,pull`; 1060 testChain!`push,pullPush,push`; 1061 testChain!`push,pullAlloc,alloc`; 1062 testChain!`push,pullPeek,peek`; 1063 testChain!`push,pull,pull`; 1064 } 1065 1066 unittest { 1067 // implicit adapters, push->peek 1068 testChain!`push,peek`; 1069 testChain!`push,peekPush,push`; 1070 testChain!`push,peekAlloc,alloc`; 1071 testChain!`push,peek,peek`; 1072 testChain!`push,peekPull,pull`; 1073 } 1074 1075 unittest { 1076 // implicit adapters, push->alloc 1077 testChain!`push,alloc`; 1078 testChain!`push,allocPush,push`; 1079 testChain!`push,allocPeek,peek`; 1080 testChain!`push,allocPull,pull`; 1081 testChain!`push,alloc,alloc`; 1082 } 1083 1084 unittest { 1085 // implicit adapters, peek->pull 1086 testChain!`peek,pull`; 1087 testChain!`peek,pullPush,push`; 1088 testChain!`peek,pullAlloc,alloc`; 1089 testChain!`peek,pullPeek,peek`; 1090 testChain!`peek,pull,pull`; 1091 } 1092 1093 unittest { 1094 // implicit adapters, peek->push 1095 testChain!`peek,push`; 1096 testChain!`peek,push,push`; 1097 testChain!`peek,pushAlloc,alloc`; 1098 testChain!`peek,pushPeek,peek`; 1099 testChain!`peek,pushPull,pull`; 1100 } 1101 1102 unittest { 1103 // implicit adapters, peek->alloc 1104 testChain!`peek,alloc`; 1105 testChain!`peek,allocPush,push`; 1106 testChain!`peek,allocPeek,peek`; 1107 testChain!`peek,allocPull,pull`; 1108 testChain!`peek,alloc,alloc`; 1109 } 1110 1111 unittest { 1112 // implicit adapters, alloc->peek 1113 testChain!`alloc,peek`; 1114 testChain!`alloc,peekPush,push`; 1115 testChain!`alloc,peekAlloc,alloc`; 1116 testChain!`alloc,peek,peek`; 1117 testChain!`alloc,peekPull,pull`; 1118 } 1119 1120 unittest { 1121 // implicit adapters, alloc->pull 1122 testChain!`alloc,pull`; 1123 testChain!`alloc,pullPush,push`; 1124 testChain!`alloc,pullAlloc,alloc`; 1125 testChain!`alloc,pullPeek,peek`; 1126 testChain!`alloc,pull,pull`; 1127 } 1128 1129 unittest { 1130 // implicit adapters, alloc->push 1131 testChain!`alloc,push`; 1132 testChain!`alloc,push,push`; 1133 testChain!`alloc,pushAlloc,alloc`; 1134 testChain!`alloc,pushPeek,peek`; 1135 testChain!`alloc,pushPull,pull`; 1136 } 1137 1138 unittest { 1139 // implicit adapters, all in one pipeline 1140 testChain!`alloc,push,peek,pull,alloc,peek,push,pull,peek,alloc,pull,push,peek`; 1141 } 1142 1143 unittest { 1144 auto array = [ 1UL, 0xdead, 6 ]; 1145 assert(isPeekPipeline!(typeof(array))); 1146 outputArray.length = 4; 1147 outputIndex = 0; 1148 array.pipe!TestPeekSink(); 1149 assert(outputArray[0 .. outputIndex] == array[]); 1150 } 1151 1152 unittest { 1153 import std.range : iota, array, take; 1154 import std.algorithm : equal; 1155 auto r = iota(37UL, 1337); 1156 static assert(isPullPipeline!(typeof(r))); 1157 outputArray.length = 5000; 1158 outputIndex = 0; 1159 r.pipe!TestPullSink(); 1160 assert(outputArray[0 .. outputIndex] == iota(37, 1337).array()); 1161 r = iota(55UL, 1555); 1162 outputArray.length = 20; 1163 outputIndex = 0; 1164 r.pipe!TestPullSink(); 1165 assert(outputArray[0 .. outputIndex] == iota(55, 1555).take(20).array()); 1166 }