1 /**
2 This module contains pipes used to convert between ranges and pipelines.
4 Built-in arrays and input ranges can be used directly as pipeline sources.
5 For arrays of characters no auto-decoding is performed.
7 Pipelines can be read from as input ranges by element (`flod.pipeline.Schema.opSlice`),
8 by chunk (`byChunk`) or by line (`byLine`).
10 Authors: $(LINK2 https://github.com/epi, Adrian Matoga)
11 Copyright: © 2016 Adrian Matoga
12 License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0).
13 */
14 module flod.range;
16 import std.range : isInputRange, isOutputRange;
17 import std.traits : isSomeChar;
18 import std.typecons : Flag, No;
20 import flod.pipeline : pipe, isSchema;
21 import flod.traits;
23 version(unittest) import std.algorithm : equal, map, filter;
25 private template ArraySource(E) {
26 	@source!E(Method.peek)
27 	struct ArraySource(alias Context, A...) {
28 		mixin Context!A;
29 		private const(E)[] array;
30 		this(const(E)* ptr, size_t length)
31 		{
32 			this.array = ptr[0 .. length];
33 		}
35 		const(E)[] peek()(size_t n) { return array; }
36 		void consume()(size_t n) { array = array[n .. $]; }
37 	}
38 }
40 package auto pipeFromArray(E)(const(E)[] array)
41 {
42 	static assert(isPeekSource!(ArraySource!E));
43 	static assert(isSource!(ArraySource!E));
44 	return .pipe!(ArraySource!E)(array.ptr, array.length);
45 }
47 unittest {
48 	auto arr = [ 1, 2, 37, 98, 123, 12313 ];
49 	auto pl = arr.pipeFromArray.instantiate();
50 	assert(pl.peek(1) == arr[]);
51 	assert(pl.peek(123) == arr[]);
52 	pl.consume(2);
53 	assert(pl.peek(23) == arr[2 .. $]);
54 	pl.consume(pl.peek(1).length);
55 	assert(pl.peek(1).length == 0);
56 }
58 private template RangeSource(R) {
59 	import std.range : ElementType;
60 	import std.traits;
61 	alias E = Unqual!(ElementType!R);
63 	@source!E(Method.pull)
64 	static struct RangeSource(alias Context, A...) {
65 		mixin Context!A;
66 		private R range;
68 		this(bool dummy, R range) { cast(void) dummy; this.range = range; }
70 		size_t pull()(E[] buf)
71 		{
72 			foreach (i, ref e; buf) {
73 				if (range.empty)
74 					return i;
75 				e = range.front;
76 				range.popFront();
77 			}
78 			return buf.length;
79 		}
80 	}
81 }
83 package auto pipeFromInputRange(R)(R r)
84 	if (isInputRange!R)
85 {
86 	return .pipe!(RangeSource!R)(false, r);
87 }
89 unittest {
90 	import std.range : iota, hasSlicing, hasLength, isInfinite;
92 	auto r = iota(6, 12);
93 	static assert( hasSlicing!(typeof(r)));
94 	static assert( hasLength!(typeof(r)));
95 	static assert(!isInfinite!(typeof(r)));
96 	auto p = r.pipeFromInputRange;
97 	static assert(isSchema!(typeof(p)));
98 	static assert(is(p.ElementType == int));
99 	auto pl = p.instantiate();
100 	int[4] buf;
101 	assert(pl.pull(buf[]) == 4);
102 	assert(buf[] == [6, 7, 8, 9]);
103 	assert(pl.pull(buf[]) == 2);
104 	assert(buf[0 .. 2] == [10, 11]);
105 }
107 unittest {
108 	import std.range : repeat, hasSlicing, hasLength, isInfinite;
110 	auto r = repeat(0xdead);
111 	static assert( hasSlicing!(typeof(r)));
112 	static assert(!hasLength!(typeof(r)));
113 	static assert( isInfinite!(typeof(r)));
114 	auto pl = r.pipeFromInputRange.instantiate();
115 	int[5] buf;
116 	assert(pl.pull(buf[]) == 5);
117 	assert(buf[] == [0xdead, 0xdead, 0xdead, 0xdead, 0xdead]);
118 	assert(pl.pull(new int[1234567]) == 1234567);
119 }
121 unittest {
122 	import std.range : generate, take, hasSlicing;
124 	auto r = generate({ int i = 0; return (){ return i++; }; }()).take(104);
125 	static assert(!hasSlicing!(typeof(r)));
126 	auto pl = r.pipeFromInputRange.instantiate();
127 	int[5] buf;
128 	assert(pl.pull(buf[]) == 5);
129 	assert(buf[] == [0, 1, 2, 3, 4]);
130 	assert(pl.pull(new int[1234567]) == 99);
131 }
133 private template RangeSink(R) {
134 	@sink(Method.push)
135 	static struct RangeSink(alias Context, A...) {
136 		mixin Context!A;
137 		private R range;
139 		alias E = InputElementType;
140 		static assert(isOutputRange!(R, E));
142 		this()(R range) { this.range = range; }
144 		size_t push()(const(E)[] buf)
145 		{
146 			import std.range : put;
147 			put(range, buf);
148 			return buf.length;
149 		}
150 	}
151 }
153 /// Copies all data from the pipeline instantiated from `schema` to output range `target`.
154 public auto copy(S, R)(S schema, R target)
155 	if (isSchema!S && isOutputRange!(R, S.ElementType))
156 {
157 	return schema.pipe!(RangeSink!R)(target);
158 }
160 unittest {
161 	import std.array : appender;
162 	import std.range : iota;
164 	auto app = appender!(int[]);
165 	iota(89, 94).pipeFromInputRange.copy(app);
166 	assert(app.data[] == [89, 90, 91, 92, 93]);
167 }
169 private template DelegateSource(alias fun, E) {
170 	@source!E(Method.push)
171 	struct DelegateSource(alias Context, A...) {
172 		mixin Context!A;
174 		void put()(const(E)[] b)
175 		{
176 			sink.push(b);
177 		}
179 		void run()()
180 		{
181 			fun(&this);
182 		}
183 	}
184 }
186 private auto pipeFromDelegate(E, alias fun)()
187 {
188 	return pipe!(DelegateSource!(fun, E));
189 }
191 unittest {
192 	import std.format : formattedWrite;
193 	import std.array : appender;
195 	auto app = appender!string;
196 	/* FIXME:
197 	Fails if the delegate literal passed to pipeFromDelegate accesses the calling function's context.
198 	Error: function flod.range.__unittestL186_57.DelegateSource!(__lambda1, char).Stage!(...).DelegateSource
199 		.run!().run cannot access frame of function flod.range.__unittestL186_57
200 	*/
201 	static int a = 42;
202 	pipeFromDelegate!(char, (orange)
203 		{
204 			orange.formattedWrite("first line %d\n", a);
205 			orange.formattedWrite("formatted %012x line\n", 0xdeadbeef);
206 		})
207 		.copy(app);
208 	assert(app.data == "first line 42\nformatted 0000deadbeef line\n");
209 }
211 template OutputRangeSource(El = void) {
212 	@source!El(Method.push)
213 	package struct OutputRangeSource(alias Context, A...) {
214 		mixin Context!A;
216 		alias E = OutputElementType;
218 		void put(const(E)[] elements)
219 		{
220 			sink.push(elements);
221 		}
222 	}
223 }
225 /**
226 Starts a pipeline to be used as an output range.
228 Params:
229  E = Type of elements accepted by the output range.
230 Returns:
231  A `flod.pipeline.Schema` to which next stages can be appended.
232 */
233 @property auto pass(E = void)()
234 {
235 	import flod.pipeline : DriveMode;
236 	return .pipe!(OutputRangeSource!E, DriveMode.source);
237 }
239 version(unittest) {
240 	void testOutputRange(string r)()
241 	{
242 		import std.array : appender;
243 		import std.format : formattedWrite;
244 		import flod.adapter;
245 		auto app = appender!string();
246 		{
247 			auto or = mixin(r);
248 			or.formattedWrite("test %d\n", 42);
249 			or.formattedWrite("%s line\n", "second");
250 		}
251 		assert(app.data == "test 42\nsecond line\n");
252 	}
253 }
255 unittest {
256 	testOutputRange!q{ pass!char.copy(app) };
257 }
259 unittest {
260 	// test if this works also with more drivers
261 	testOutputRange!q{ pass!char.peekAlloc.pullPush.copy(app) };
262 }
264 /// ditto
265 @property auto pass(E, alias fun)()
266 {
267 	return .pipe!(DelegateSource!(fun, E));
268 }
270 ///
271 unittest {
272 	import flod : array;
273 	import std.array : appender;
274 	import std.format : formattedWrite;
275 	import std.range : put;
277 	// Create an pipeline object that can be used as output range
278 	auto app = appender!string;
279 	auto p = pass!char.copy(app);
280 	put(p, "Hello, world!\n");
281 	p.formattedWrite("The answer is: %d\n", 42);
282 	assert(app.data == "Hello, world!\nThe answer is: 42\n");
284 	// Write to a pipeline from a lambda
285 	auto s = pass!(char, (r) {
286 			put(r, "Hello, lambda world!\n");
287 			r.formattedWrite("The answer is still %d\n", 42);
288 		})
289 		.array();
290 	assert(s == "Hello, lambda world!\nThe answer is still 42\n");
292 }
294 unittest {
295 	import std.algorithm : copy;
296 	import std.range : iota, array;
297 	import flod.pipeline : TestPushSink, outputArray, outputIndex, Arg;
298 	outputArray.length = 5000;
299 	outputIndex = 0;
300 	pass!(ulong, (o)
301 		{
302 			auto r = iota(42UL, 1024);
303 			r.copy(o);
304 		})
305 		.pipe!TestPushSink(Arg!TestPushSink());
306 	assert(outputArray[0 .. outputIndex] == iota(42UL, 1024).array());
307 }
309 @sink(Method.pull)
310 @sink(Method.peek)
311 package struct ByElement(alias Context, A...) {
312 	mixin Context!A;
313 	private alias E = OutputElementType;
315 	static if (inputMethod == Method.peek) {
316 		void popFront()() { source.consume(1); }
317 		@property E front()() { return source.peek(1)[0]; }
318 		@property bool empty()() { return source.peek(1).length == 0; }
319 	} else static if (inputMethod == Method.pull) {
320 		private E[1] current_;
321 		private bool empty_ = true;
323 		@property bool empty()()
324 		{
325 			if (!empty_)
326 				return false;
327 			popFront();
328 			return empty_;
329 		}
331 		@property E front()() { return current_[0]; }
333 		void popFront()()
334 		{
335 			empty_ = source.pull(current_[]) != 1;
336 		}
337 	} else {
338 		static assert(0);
339 	}
340 }
342 unittest {
343 	auto p = [10, 20, 30].pipe!ByElement;
344 	assert(!p.empty);
345 	assert(p.front == 10);
346 	p.popFront();
347 	assert(p.front == 20);
348 }
350 unittest {
351 	import std.range : iota;
352 	auto p = iota(42, 50).pipe!ByElement;
353 	assert(!p.empty);
354 	assert(p.front == 42);
355 	p.popFront();
356 	assert(p.front == 43);
357 }
359 package template Splitter(Separator, size_t peekStep = 128) {
360 	@sink(Method.peek)
361 	struct Splitter(alias Context, A...) {
362 		mixin Context!A;
363 	private:
364 		import std.meta : AliasSeq;
365 		import std.traits : isSomeChar, isIntegral, Unqual;
367 		static if (isSomeChar!InputElementType)
368 			alias Char = Unqual!InputElementType;
369 		else static if (isIntegral!InputElementType && InputElementType.sizeof <= 4)
370 			alias Char = AliasSeq!(void, char, wchar, void, dchar)[InputElementType.sizeof];
371 		static assert(is(Char),
372 			"Only streams of chars or bytes can be read by line, not " ~ InputElementType.stringof);
374 		const(Char)[] line;
375 		bool keepSeparator;
376 		static if (is(Separator : Char)) {
377 			Char separator;
378 			bool done;
379 		}
380 		else {
381 			immutable(Char)[] separator;
382 		}
384 		public this(Separator)(typeof(null) dummy, Separator separator, bool keep)
385 		{
386 			// TODO: convert separator to array without GC allocation
387 			// TODO: optimize for single-char separator
388 			import std.conv : to;
389 			this.keepSeparator = keep;
390 			static if (is(Separator : Char))
391 				this.separator = separator;
392 			else
393 				this.separator = separator.to!(typeof(this.separator));
394 			next();
395 		}
397 		void next()()
398 		{
399 			if (line.length)
400 				source.consume(line.length);
401 			line = cast(typeof(line)) source.peek(peekStep);
402 			static if (is(Separator : Char)) {
403 				size_t start = 0;
404 				for (;;) {
405 					import std..string : indexOf;
406 					auto i = line[start .. $].indexOf(separator);
407 					if (i >= 0) {
408 						line = line[0 .. start + i + 1];
409 						return;
410 					}
411 					start = line.length;
412 					line = cast(const(Char)[]) source.peek(start + peekStep);
413 					if (line.length == start) {
414 						done = true;
415 						if (line.length == 0)
416 							line = null;
417 						return;
418 					}
419 				}
420 			} else {
421 				for (size_t i = 0; ; i++) {
422 					if (line.length - i < separator.length) {
423 						line = cast(typeof(line)) source.peek(line.length + peekStep);
424 						if (line.length - i < separator.length) {
425 							separator = null;
426 							if (line.length == 0)
427 								line = null;
428 							return; // we've read everything, and haven't found separator
429 						}
430 					}
431 					if (line[i .. i + separator.length] == separator[]) {
432 						line = line[0 .. i + separator.length];
433 						return;
434 					}
435 				}
436 			}
437 		}
439 	public:
440 		@property bool empty()()
441 		{
442 			return line is null;
443 		}
445 		@property const(Char)[] front()()
446 		{
447 			static if (is(Separator : Char))
448 				return keepSeparator ? line : line[0 .. $ - (done ? 0 : 1)];
449 			else
450 				return keepSeparator ? line : line[0 .. $ - separator.length];
451 		}
453 		void popFront()()
454 		{
455 			static if (is(Separator : Char)) {
456 				if (done)
457 					line = null;
458 				else
459 					next();
460 			} else {
461 				if (separator.length)
462 					next();
463 				else
464 					line = null;
465 			}
466 		}
467 	}
468 }
470 unittest {
471 	import std..string : representation;
472 	assert("Zażółć gęślą jaźń".pipe!(Splitter!(char, 3))(null, ' ', true)
473 		.equal(["Zażółć ", "gęślą ", "jaźń"]));
474 	assert("Zażółć gęślą jaźń".representation.pipe!(Splitter!(char, 3))(null, ' ', true)
475 		.equal(["Zażółć ", "gęślą ", "jaźń"]));
476 	assert("Zażółć gęślą jaźń "w.pipe!(Splitter!(dstring, 5))(null, " "d, true)
477 		.equal(["Zażółć "w, "gęślą "w, "jaźń "w]));
478 	// map and filter decode the string into a sequence of dchars
479 	assert("여보세요 세계".map!"a".filter!(a => true).pipe!(Splitter!(string, 2))(null, " ", false)
480 		.equal(["여보세요"d, "세계"d]));
481 	assert("Foo\r\nBar\r\nBaz\r\r\n\r\n".pipe!(Splitter!(wstring, 4))(null, "\r\n"w, false)
482 		.equal(["Foo", "Bar", "Baz\r", ""]));
483 }
485 /**
486 Returns a range that reads from the pipeline one line at a time.
488 Allowed input element types are built-in character and integer types. The stream is interpreted
489 as UTF-8, UTF-16 or UTF-32 according to the input element size.
490 Range elements are arrays of respective built-in character types.
492 Each `front` is valid only until `popFront` is called. If retention is needed,
493 a copy must be made using e.g. `idup` or `to!string`.
494 */
495 auto byLine(S, Terminator)(S schema, Terminator terminator = '\n',
496 	Flag!"keepTerminator" keep_terminator = No.keepTerminator)
497 	if (isSchema!S && isSomeChar!Terminator)
498 {
499 	return schema.pipe!(Splitter!Terminator)(null, terminator, keep_terminator);
500 }
502 /// ditto
503 auto byLine(S, Terminator)(S schema, Terminator terminator,
504 	Flag!"keepTerminator" keep_terminator = No.keepTerminator)
505 	if (isSchema!S && isInputRange!Terminator)
506 {
507 	return schema.pipe!(Splitter!Terminator)(null, terminator, keep_terminator);
508 }
510 ///
511 unittest {
512 	import std.uni : toUpper;
513 	assert("first\nsecond\nthird\n".byLine.equal(["first", "second", "third"]));
514 	assert("zażółć\r\ngęślą\r\njaźń".map!toUpper.byLine("\r\n").equal(["ZAŻÓŁĆ"d, "GĘŚLĄ"d, "JAŹŃ"d]));
515 }
517 unittest {
518 	import flod.adapter : peekPush;
519 	assert("first\nsecond\nthird".peekPush.byLine.equal(["first", "second", "third"]));
520 }
522 unittest {
523 	// For arrays of chars byLine should just give slices of the original array.
524 	// This is not a part of the API, but an implementation detail with positive effect
525 	// on performance.
526 	auto line = "just one line";
527 	assert(line.byLine.front is line);
529 }
531 unittest {
532 	import std.conv : to;
533 	import std.meta : AliasSeq;
534 	foreach (T; AliasSeq!(string, wstring, dstring)) {
535 		assert(q"EOF
536 Prześliczna dzieweczka na spacer raz szła
537 Gdy noc ją złapała wietrzysta i zła
538 Być może przestraszył by ziąb i mrok ją
539 Lecz miałą wszak mufkę prześliczną swą
540 EOF".to!T.byLine.equal([
541 				"Prześliczna dzieweczka na spacer raz szła",
542 				"Gdy noc ją złapała wietrzysta i zła",
543 				"Być może przestraszył by ziąb i mrok ją",
544 				"Lecz miałą wszak mufkę prześliczną swą",
545 			].map!(to!T)));
546 	}
547 }
549 @sink(Method.peek)
550 private struct PeekByChunk(alias Context, A...) {
551 	mixin Context!A;
552 private:
553 	alias E = InputElementType;
555 	const(E)[] chunk;
556 	size_t chunkSize;
558 public:
559 	this(size_t chunk_size)
560 	{
561 		chunkSize = chunk_size;
562 		popFront();
563 	}
565 	@property bool empty()() { return chunk.length == 0; }
567 	@property const(E)[] front()() { return chunk; }
569 	void popFront()()
570 	{
571 		if (chunk.length)
572 			source.consume(chunk.length);
573 		if (chunkSize) {
574 			chunk = source.peek(chunkSize);
575 			if (chunk.length > chunkSize)
576 				chunk.length = chunkSize;
577 		} else {
578 			chunk = source.peek(4096);
579 		}
580 	}
581 }
583 private template PullByChunk(E) {
584 	@sink!E(Method.pull)
585 	struct PullByChunk(alias Context, A...) {
586 		mixin Context!A;
587 	private:
588 		alias E = InputElementType;
590 		E[] chunk;
592 	public:
593 		this(E[] buffer)
594 		{
595 			chunk = buffer;
596 			popFront();
597 		}
599 		@property bool empty()() { return chunk.length == 0; }
601 		@property const(E)[] front()() { return chunk; }
603 		void popFront()()
604 		{
605 			chunk.length = source.pull(chunk);
606 		}
607 	}
608 }
610 /**
611 Returns a range that reads from the pipeline one chunk at a time.
612 */
613 auto byChunk(S)(S schema, size_t chunk_size = 0)
614 	if (isSchema!S)
615 {
616 	return schema.pipe!PeekByChunk(chunk_size);
617 }
619 /// ditto
620 auto byChunk(S, E)(S schema, E[] buf)
621 	if (isSchema!S)
622 {
623 	return schema.pipe!(PullByChunk!E)(buf);
624 }
626 ///
627 unittest {
628 	auto arr = [ 42, 41, 40, 39, 38, 37, 36 ];
629 	assert(arr.byChunk(2).equal([[ 42, 41 ], [ 40, 39 ], [ 38, 37 ], [ 36 ]]));
630 	int[3] buf;
631 	assert(arr.byChunk(buf[]).equal([[ 42, 41, 40 ], [ 39, 38, 37 ], [ 36 ]]));
632 }