1 /** 2 This module contains pipes used to convert between ranges and pipelines. 3 4 Built-in arrays and input ranges can be used directly as pipeline sources. 5 For arrays of characters no auto-decoding is performed. 6 7 Pipelines can be read from as input ranges by element (`flod.pipeline.Schema.opSlice`), 8 by chunk (`byChunk`) or by line (`byLine`). 9 10 Authors: $(LINK2 https://github.com/epi, Adrian Matoga) 11 Copyright: © 2016 Adrian Matoga 12 License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0). 13 */ 14 module flod.range; 15 16 import std.range : isInputRange, isOutputRange; 17 import std.traits : isSomeChar; 18 import std.typecons : Flag, No; 19 20 import flod.pipeline : pipe, isSchema; 21 import flod.traits; 22 23 version(unittest) import std.algorithm : equal, map, filter; 24 25 private template ArraySource(E) { 26 @source!E(Method.peek) 27 struct ArraySource(alias Context, A...) { 28 mixin Context!A; 29 private const(E)[] array; 30 this(const(E)* ptr, size_t length) 31 { 32 this.array = ptr[0 .. length]; 33 } 34 35 const(E)[] peek()(size_t n) { return array; } 36 void consume()(size_t n) { array = array[n .. $]; } 37 } 38 } 39 40 package auto pipeFromArray(E)(const(E)[] array) 41 { 42 static assert(isPeekSource!(ArraySource!E)); 43 static assert(isSource!(ArraySource!E)); 44 return .pipe!(ArraySource!E)(array.ptr, array.length); 45 } 46 47 unittest { 48 auto arr = [ 1, 2, 37, 98, 123, 12313 ]; 49 auto pl = arr.pipeFromArray.instantiate(); 50 assert(pl.peek(1) == arr[]); 51 assert(pl.peek(123) == arr[]); 52 pl.consume(2); 53 assert(pl.peek(23) == arr[2 .. $]); 54 pl.consume(pl.peek(1).length); 55 assert(pl.peek(1).length == 0); 56 } 57 58 private template RangeSource(R) { 59 import std.range : ElementType; 60 import std.traits; 61 alias E = Unqual!(ElementType!R); 62 63 @source!E(Method.pull) 64 static struct RangeSource(alias Context, A...) { 65 mixin Context!A; 66 private R range; 67 68 this(bool dummy, R range) { cast(void) dummy; this.range = range; } 69 70 size_t pull()(E[] buf) 71 { 72 foreach (i, ref e; buf) { 73 if (range.empty) 74 return i; 75 e = range.front; 76 range.popFront(); 77 } 78 return buf.length; 79 } 80 } 81 } 82 83 package auto pipeFromInputRange(R)(R r) 84 if (isInputRange!R) 85 { 86 return .pipe!(RangeSource!R)(false, r); 87 } 88 89 unittest { 90 import std.range : iota, hasSlicing, hasLength, isInfinite; 91 92 auto r = iota(6, 12); 93 static assert( hasSlicing!(typeof(r))); 94 static assert( hasLength!(typeof(r))); 95 static assert(!isInfinite!(typeof(r))); 96 auto p = r.pipeFromInputRange; 97 static assert(isSchema!(typeof(p))); 98 static assert(is(p.ElementType == int)); 99 auto pl = p.instantiate(); 100 int[4] buf; 101 assert(pl.pull(buf[]) == 4); 102 assert(buf[] == [6, 7, 8, 9]); 103 assert(pl.pull(buf[]) == 2); 104 assert(buf[0 .. 2] == [10, 11]); 105 } 106 107 unittest { 108 import std.range : repeat, hasSlicing, hasLength, isInfinite; 109 110 auto r = repeat(0xdead); 111 static assert( hasSlicing!(typeof(r))); 112 static assert(!hasLength!(typeof(r))); 113 static assert( isInfinite!(typeof(r))); 114 auto pl = r.pipeFromInputRange.instantiate(); 115 int[5] buf; 116 assert(pl.pull(buf[]) == 5); 117 assert(buf[] == [0xdead, 0xdead, 0xdead, 0xdead, 0xdead]); 118 assert(pl.pull(new int[1234567]) == 1234567); 119 } 120 121 unittest { 122 import std.range : generate, take, hasSlicing; 123 124 auto r = generate({ int i = 0; return (){ return i++; }; }()).take(104); 125 static assert(!hasSlicing!(typeof(r))); 126 auto pl = r.pipeFromInputRange.instantiate(); 127 int[5] buf; 128 assert(pl.pull(buf[]) == 5); 129 assert(buf[] == [0, 1, 2, 3, 4]); 130 assert(pl.pull(new int[1234567]) == 99); 131 } 132 133 private template RangeSink(R) { 134 @sink(Method.push) 135 static struct RangeSink(alias Context, A...) { 136 mixin Context!A; 137 private R range; 138 139 alias E = InputElementType; 140 static assert(isOutputRange!(R, E)); 141 142 this()(R range) { this.range = range; } 143 144 size_t push()(const(E)[] buf) 145 { 146 import std.range : put; 147 put(range, buf); 148 return buf.length; 149 } 150 } 151 } 152 153 /// Copies all data from the pipeline instantiated from `schema` to output range `target`. 154 public auto copy(S, R)(S schema, R target) 155 if (isSchema!S && isOutputRange!(R, S.ElementType)) 156 { 157 return schema.pipe!(RangeSink!R)(target); 158 } 159 160 unittest { 161 import std.array : appender; 162 import std.range : iota; 163 164 auto app = appender!(int[]); 165 iota(89, 94).pipeFromInputRange.copy(app); 166 assert(app.data[] == [89, 90, 91, 92, 93]); 167 } 168 169 private template DelegateSource(alias fun, E) { 170 @source!E(Method.push) 171 struct DelegateSource(alias Context, A...) { 172 mixin Context!A; 173 174 void put()(const(E)[] b) 175 { 176 sink.push(b); 177 } 178 179 void run()() 180 { 181 fun(&this); 182 } 183 } 184 } 185 186 private auto pipeFromDelegate(E, alias fun)() 187 { 188 return pipe!(DelegateSource!(fun, E)); 189 } 190 191 unittest { 192 import std.format : formattedWrite; 193 import std.array : appender; 194 195 auto app = appender!string; 196 /* FIXME: 197 Fails if the delegate literal passed to pipeFromDelegate accesses the calling function's context. 198 Error: function flod.range.__unittestL186_57.DelegateSource!(__lambda1, char).Stage!(...).DelegateSource 199 .run!().run cannot access frame of function flod.range.__unittestL186_57 200 */ 201 static int a = 42; 202 pipeFromDelegate!(char, (orange) 203 { 204 orange.formattedWrite("first line %d\n", a); 205 orange.formattedWrite("formatted %012x line\n", 0xdeadbeef); 206 }) 207 .copy(app); 208 assert(app.data == "first line 42\nformatted 0000deadbeef line\n"); 209 } 210 211 template OutputRangeSource(El = void) { 212 @source!El(Method.push) 213 package struct OutputRangeSource(alias Context, A...) { 214 mixin Context!A; 215 216 alias E = OutputElementType; 217 218 void put(const(E)[] elements) 219 { 220 sink.push(elements); 221 } 222 } 223 } 224 225 /** 226 Starts a pipeline to be used as an output range. 227 228 Params: 229 E = Type of elements accepted by the output range. 230 Returns: 231 A `flod.pipeline.Schema` to which next stages can be appended. 232 */ 233 @property auto pass(E = void)() 234 { 235 import flod.pipeline : DriveMode; 236 return .pipe!(OutputRangeSource!E, DriveMode.source); 237 } 238 239 version(unittest) { 240 void testOutputRange(string r)() 241 { 242 import std.array : appender; 243 import std.format : formattedWrite; 244 import flod.adapter; 245 auto app = appender!string(); 246 { 247 auto or = mixin(r); 248 or.formattedWrite("test %d\n", 42); 249 or.formattedWrite("%s line\n", "second"); 250 } 251 assert(app.data == "test 42\nsecond line\n"); 252 } 253 } 254 255 unittest { 256 testOutputRange!q{ pass!char.copy(app) }; 257 } 258 259 unittest { 260 // test if this works also with more drivers 261 testOutputRange!q{ pass!char.peekAlloc.pullPush.copy(app) }; 262 } 263 264 /// ditto 265 @property auto pass(E, alias fun)() 266 { 267 return .pipe!(DelegateSource!(fun, E)); 268 } 269 270 /// 271 unittest { 272 import flod : array; 273 import std.array : appender; 274 import std.format : formattedWrite; 275 import std.range : put; 276 277 // Create an pipeline object that can be used as output range 278 auto app = appender!string; 279 auto p = pass!char.copy(app); 280 put(p, "Hello, world!\n"); 281 p.formattedWrite("The answer is: %d\n", 42); 282 assert(app.data == "Hello, world!\nThe answer is: 42\n"); 283 284 // Write to a pipeline from a lambda 285 auto s = pass!(char, (r) { 286 put(r, "Hello, lambda world!\n"); 287 r.formattedWrite("The answer is still %d\n", 42); 288 }) 289 .array(); 290 assert(s == "Hello, lambda world!\nThe answer is still 42\n"); 291 292 } 293 294 unittest { 295 import std.algorithm : copy; 296 import std.range : iota, array; 297 import flod.pipeline : TestPushSink, outputArray, outputIndex, Arg; 298 outputArray.length = 5000; 299 outputIndex = 0; 300 pass!(ulong, (o) 301 { 302 auto r = iota(42UL, 1024); 303 r.copy(o); 304 }) 305 .pipe!TestPushSink(Arg!TestPushSink()); 306 assert(outputArray[0 .. outputIndex] == iota(42UL, 1024).array()); 307 } 308 309 @sink(Method.pull) 310 @sink(Method.peek) 311 package struct ByElement(alias Context, A...) { 312 mixin Context!A; 313 private alias E = OutputElementType; 314 315 static if (inputMethod == Method.peek) { 316 void popFront()() { source.consume(1); } 317 @property E front()() { return source.peek(1)[0]; } 318 @property bool empty()() { return source.peek(1).length == 0; } 319 } else static if (inputMethod == Method.pull) { 320 private E[1] current_; 321 private bool empty_ = true; 322 323 @property bool empty()() 324 { 325 if (!empty_) 326 return false; 327 popFront(); 328 return empty_; 329 } 330 331 @property E front()() { return current_[0]; } 332 333 void popFront()() 334 { 335 empty_ = source.pull(current_[]) != 1; 336 } 337 } else { 338 static assert(0); 339 } 340 } 341 342 unittest { 343 auto p = [10, 20, 30].pipe!ByElement; 344 assert(!p.empty); 345 assert(p.front == 10); 346 p.popFront(); 347 assert(p.front == 20); 348 } 349 350 unittest { 351 import std.range : iota; 352 auto p = iota(42, 50).pipe!ByElement; 353 assert(!p.empty); 354 assert(p.front == 42); 355 p.popFront(); 356 assert(p.front == 43); 357 } 358 359 package template Splitter(Separator, size_t peekStep = 128) { 360 @sink(Method.peek) 361 struct Splitter(alias Context, A...) { 362 mixin Context!A; 363 private: 364 import std.meta : AliasSeq; 365 import std.traits : isSomeChar, isIntegral, Unqual; 366 367 static if (isSomeChar!InputElementType) 368 alias Char = Unqual!InputElementType; 369 else static if (isIntegral!InputElementType && InputElementType.sizeof <= 4) 370 alias Char = AliasSeq!(void, char, wchar, void, dchar)[InputElementType.sizeof]; 371 static assert(is(Char), 372 "Only streams of chars or bytes can be read by line, not " ~ InputElementType.stringof); 373 374 const(Char)[] line; 375 bool keepSeparator; 376 static if (is(Separator : Char)) { 377 Char separator; 378 bool done; 379 } 380 else { 381 immutable(Char)[] separator; 382 } 383 384 public this(Separator)(typeof(null) dummy, Separator separator, bool keep) 385 { 386 // TODO: convert separator to array without GC allocation 387 // TODO: optimize for single-char separator 388 import std.conv : to; 389 this.keepSeparator = keep; 390 static if (is(Separator : Char)) 391 this.separator = separator; 392 else 393 this.separator = separator.to!(typeof(this.separator)); 394 next(); 395 } 396 397 void next()() 398 { 399 if (line.length) 400 source.consume(line.length); 401 line = cast(typeof(line)) source.peek(peekStep); 402 static if (is(Separator : Char)) { 403 size_t start = 0; 404 for (;;) { 405 import std..string : indexOf; 406 auto i = line[start .. $].indexOf(separator); 407 if (i >= 0) { 408 line = line[0 .. start + i + 1]; 409 return; 410 } 411 start = line.length; 412 line = cast(const(Char)[]) source.peek(start + peekStep); 413 if (line.length == start) { 414 done = true; 415 if (line.length == 0) 416 line = null; 417 return; 418 } 419 } 420 } else { 421 for (size_t i = 0; ; i++) { 422 if (line.length - i < separator.length) { 423 line = cast(typeof(line)) source.peek(line.length + peekStep); 424 if (line.length - i < separator.length) { 425 separator = null; 426 if (line.length == 0) 427 line = null; 428 return; // we've read everything, and haven't found separator 429 } 430 } 431 if (line[i .. i + separator.length] == separator[]) { 432 line = line[0 .. i + separator.length]; 433 return; 434 } 435 } 436 } 437 } 438 439 public: 440 @property bool empty()() 441 { 442 return line is null; 443 } 444 445 @property const(Char)[] front()() 446 { 447 static if (is(Separator : Char)) 448 return keepSeparator ? line : line[0 .. $ - (done ? 0 : 1)]; 449 else 450 return keepSeparator ? line : line[0 .. $ - separator.length]; 451 } 452 453 void popFront()() 454 { 455 static if (is(Separator : Char)) { 456 if (done) 457 line = null; 458 else 459 next(); 460 } else { 461 if (separator.length) 462 next(); 463 else 464 line = null; 465 } 466 } 467 } 468 } 469 470 unittest { 471 import std..string : representation; 472 assert("Zażółć gęślą jaźń".pipe!(Splitter!(char, 3))(null, ' ', true) 473 .equal(["Zażółć ", "gęślą ", "jaźń"])); 474 assert("Zażółć gęślą jaźń".representation.pipe!(Splitter!(char, 3))(null, ' ', true) 475 .equal(["Zażółć ", "gęślą ", "jaźń"])); 476 assert("Zażółć gęślą jaźń "w.pipe!(Splitter!(dstring, 5))(null, " "d, true) 477 .equal(["Zażółć "w, "gęślą "w, "jaźń "w])); 478 // map and filter decode the string into a sequence of dchars 479 assert("여보세요 세계".map!"a".filter!(a => true).pipe!(Splitter!(string, 2))(null, " ", false) 480 .equal(["여보세요"d, "세계"d])); 481 assert("Foo\r\nBar\r\nBaz\r\r\n\r\n".pipe!(Splitter!(wstring, 4))(null, "\r\n"w, false) 482 .equal(["Foo", "Bar", "Baz\r", ""])); 483 } 484 485 /** 486 Returns a range that reads from the pipeline one line at a time. 487 488 Allowed input element types are built-in character and integer types. The stream is interpreted 489 as UTF-8, UTF-16 or UTF-32 according to the input element size. 490 Range elements are arrays of respective built-in character types. 491 492 Each `front` is valid only until `popFront` is called. If retention is needed, 493 a copy must be made using e.g. `idup` or `to!string`. 494 */ 495 auto byLine(S, Terminator)(S schema, Terminator terminator = '\n', 496 Flag!"keepTerminator" keep_terminator = No.keepTerminator) 497 if (isSchema!S && isSomeChar!Terminator) 498 { 499 return schema.pipe!(Splitter!Terminator)(null, terminator, keep_terminator); 500 } 501 502 /// ditto 503 auto byLine(S, Terminator)(S schema, Terminator terminator, 504 Flag!"keepTerminator" keep_terminator = No.keepTerminator) 505 if (isSchema!S && isInputRange!Terminator) 506 { 507 return schema.pipe!(Splitter!Terminator)(null, terminator, keep_terminator); 508 } 509 510 /// 511 unittest { 512 import std.uni : toUpper; 513 assert("first\nsecond\nthird\n".byLine.equal(["first", "second", "third"])); 514 assert("zażółć\r\ngęślą\r\njaźń".map!toUpper.byLine("\r\n").equal(["ZAŻÓŁĆ"d, "GĘŚLĄ"d, "JAŹŃ"d])); 515 } 516 517 unittest { 518 import flod.adapter : peekPush; 519 assert("first\nsecond\nthird".peekPush.byLine.equal(["first", "second", "third"])); 520 } 521 522 unittest { 523 // For arrays of chars byLine should just give slices of the original array. 524 // This is not a part of the API, but an implementation detail with positive effect 525 // on performance. 526 auto line = "just one line"; 527 assert(line.byLine.front is line); 528 529 } 530 531 unittest { 532 import std.conv : to; 533 import std.meta : AliasSeq; 534 foreach (T; AliasSeq!(string, wstring, dstring)) { 535 assert(q"EOF 536 Prześliczna dzieweczka na spacer raz szła 537 Gdy noc ją złapała wietrzysta i zła 538 Być może przestraszył by ziąb i mrok ją 539 Lecz miałą wszak mufkę prześliczną swą 540 EOF".to!T.byLine.equal([ 541 "Prześliczna dzieweczka na spacer raz szła", 542 "Gdy noc ją złapała wietrzysta i zła", 543 "Być może przestraszył by ziąb i mrok ją", 544 "Lecz miałą wszak mufkę prześliczną swą", 545 ].map!(to!T))); 546 } 547 } 548 549 @sink(Method.peek) 550 private struct PeekByChunk(alias Context, A...) { 551 mixin Context!A; 552 private: 553 alias E = InputElementType; 554 555 const(E)[] chunk; 556 size_t chunkSize; 557 558 public: 559 this(size_t chunk_size) 560 { 561 chunkSize = chunk_size; 562 popFront(); 563 } 564 565 @property bool empty()() { return chunk.length == 0; } 566 567 @property const(E)[] front()() { return chunk; } 568 569 void popFront()() 570 { 571 if (chunk.length) 572 source.consume(chunk.length); 573 if (chunkSize) { 574 chunk = source.peek(chunkSize); 575 if (chunk.length > chunkSize) 576 chunk.length = chunkSize; 577 } else { 578 chunk = source.peek(4096); 579 } 580 } 581 } 582 583 private template PullByChunk(E) { 584 @sink!E(Method.pull) 585 struct PullByChunk(alias Context, A...) { 586 mixin Context!A; 587 private: 588 alias E = InputElementType; 589 590 E[] chunk; 591 592 public: 593 this(E[] buffer) 594 { 595 chunk = buffer; 596 popFront(); 597 } 598 599 @property bool empty()() { return chunk.length == 0; } 600 601 @property const(E)[] front()() { return chunk; } 602 603 void popFront()() 604 { 605 chunk.length = source.pull(chunk); 606 } 607 } 608 } 609 610 /** 611 Returns a range that reads from the pipeline one chunk at a time. 612 */ 613 auto byChunk(S)(S schema, size_t chunk_size = 0) 614 if (isSchema!S) 615 { 616 return schema.pipe!PeekByChunk(chunk_size); 617 } 618 619 /// ditto 620 auto byChunk(S, E)(S schema, E[] buf) 621 if (isSchema!S) 622 { 623 return schema.pipe!(PullByChunk!E)(buf); 624 } 625 626 /// 627 unittest { 628 auto arr = [ 42, 41, 40, 39, 38, 37, 36 ]; 629 assert(arr.byChunk(2).equal([[ 42, 41 ], [ 40, 39 ], [ 38, 37 ], [ 36 ]])); 630 int[3] buf; 631 assert(arr.byChunk(buf[]).equal([[ 42, 41, 40 ], [ 39, 38, 37 ], [ 36 ]])); 632 }