1 /** Adapters connecting stages with incompatible interfaces.
2  *
3  *  Authors: $(LINK2 https://github.com/epi, Adrian Matoga)
4  *  Copyright: © 2016 Adrian Matoga
5  *  License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0).
6  */
7 module flod.adapter;
8 
9 import flod.pipeline;
10 import flod.traits;
11 
12 private template DefaultPullPeekAdapter(Buffer, E) {
13 	@pullSink!E @peekSource!E
14 	struct DefaultPullPeekAdapter(Source) {
15 		Source source;
16 		Buffer buffer;
17 
18 		this()(auto ref Buffer buffer)
19 		{
20 			this.buffer = buffer;
21 		}
22 
23 		const(E)[] peek(size_t size)
24 		{
25 			auto ready = buffer.peek!E();
26 			if (ready.length >= size)
27 				return ready;
28 			auto chunk = buffer.alloc!E(size - ready.length);
29 			size_t r = source.pull(chunk);
30 			buffer.commit!E(r);
31 			return buffer.peek!E();
32 		}
33 
34 		void consume(size_t size)
35 		{
36 			buffer.consume!E(size);
37 		}
38 	}
39 }
40 
41 ///
42 auto pullPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
43 	if (isPullPipeline!Pipeline)
44 {
45 	return pipeline.pipe!(DefaultPullPeekAdapter!(Buffer, Pipeline.ElementType))(buffer);
46 }
47 
48 ///
49 auto pullPeek(Pipeline)(auto ref Pipeline pipeline)
50 	if (isPullPipeline!Pipeline)
51 {
52 	import flod.buffer : movingBuffer;
53 	return pipeline.pullPeek(movingBuffer());
54 }
55 
56 private template DefaultPeekPullAdapter(E) {
57 	@peekSink!E @pullSource!E
58 	struct DefaultPeekPullAdapter(Source) {
59 		Source source;
60 
61 		size_t pull(E[] buf)
62 		{
63 			import std.algorithm : min;
64 			auto inbuf = source.peek(buf.length);
65 			auto l = min(buf.length, inbuf.length);
66 			buf[0 .. l] = inbuf[0 .. l];
67 			source.consume(l);
68 			return l;
69 		}
70 	}
71 }
72 
73 ///
74 auto peekPull(Pipeline)(auto ref Pipeline pipeline)
75 	if (isPeekPipeline!Pipeline)
76 {
77 	return pipeline.pipe!(DefaultPeekPullAdapter!(Pipeline.ElementType));
78 }
79 
80 private template DefaultPullPushAdapter(E) {
81 	@pullSink!E @pushSource!E
82 	struct DefaultPullPushAdapter(Source, Sink) {
83 		Source source;
84 		Sink sink;
85 		size_t chunkSize;
86 
87 		this(size_t chunkSize)
88 		{
89 			this.chunkSize = chunkSize;
90 		}
91 
92 		void run()()
93 		{
94 			import core.stdc.stdlib : alloca;
95 			auto buf = (cast(E*) alloca(E.sizeof * chunkSize))[0 .. chunkSize];
96 			for (;;) {
97 				size_t inp = source.pull(buf[]);
98 				if (inp == 0)
99 					break;
100 				if (sink.push(buf[0 .. inp]) < chunkSize)
101 					break;
102 			}
103 		}
104 	}
105 }
106 
107 ///
108 auto pullPush(Pipeline)(auto ref Pipeline pipeline, size_t chunkSize = 4096)
109 	if (isPullPipeline!Pipeline)
110 {
111 	return pipeline.pipe!(DefaultPullPushAdapter!(Pipeline.ElementType))(chunkSize);
112 }
113 
114 private template DefaultPullAllocAdapter(E) {
115 	@pullSink!E @allocSource!E
116 	struct DefaultPullAllocAdapter(Source, Sink) {
117 		Source source;
118 		Sink sink;
119 		size_t chunkSize;
120 
121 		this(size_t chunkSize)
122 		{
123 			this.chunkSize = chunkSize;
124 		}
125 
126 		void run()()
127 		{
128 			E[] buf;
129 			for (;;) {
130 				if (!sink.alloc(buf, chunkSize)) {
131 					import core.exception : OutOfMemoryError;
132 					throw new OutOfMemoryError;
133 				}
134 				size_t inp = source.pull(buf[]);
135 				if (inp == 0)
136 					break;
137 				if (sink.commit(inp) < chunkSize)
138 					break;
139 			}
140 		}
141 	}
142 }
143 
144 ///
145 auto pullAlloc(Pipeline)(auto ref Pipeline pipeline, size_t chunkSize = 4096)
146 	if (isPullPipeline!Pipeline)
147 {
148 	return pipeline.pipe!(DefaultPullAllocAdapter!(Pipeline.ElementType))(chunkSize);
149 }
150 
151 private template DefaultPeekPushAdapter(E) {
152 	@peekSink!E @pushSource!E
153 	struct DefaultPeekPushAdapter(Source, Sink) {
154 		Source source;
155 		Sink sink;
156 		size_t minSliceSize;
157 
158 		this(size_t minSliceSize)
159 		{
160 			this.minSliceSize = minSliceSize;
161 		}
162 
163 		void run()()
164 		{
165 			for (;;) {
166 				auto buf = source.peek(minSliceSize);
167 				if (buf.length == 0)
168 					break;
169 				size_t w = sink.push(buf[]);
170 				if (w < minSliceSize)
171 					break;
172 				assert(w <= buf.length);
173 				source.consume(w);
174 			}
175 		}
176 	}
177 }
178 
179 ///
180 auto peekPush(Pipeline)(auto ref Pipeline pipeline, size_t minSliceSize = size_t.sizeof)
181 	if (isPeekPipeline!Pipeline)
182 {
183 	return pipeline.pipe!(DefaultPeekPushAdapter!(Pipeline.ElementType))(minSliceSize);
184 }
185 
186 private template DefaultPeekAllocAdapter(E) {
187 	@peekSink!E @allocSource!E
188 	struct DefaultPeekAllocAdapter(Source, Sink) {
189 		Source source;
190 		Sink sink;
191 		size_t minSliceSize;
192 		size_t maxSliceSize;
193 
194 		this(size_t minSliceSize, size_t maxSliceSize)
195 		{
196 			this.minSliceSize = minSliceSize;
197 			this.maxSliceSize = maxSliceSize;
198 		}
199 
200 		void run()()
201 		{
202 			E[] ob;
203 			for (;;) {
204 				auto ib = source.peek(minSliceSize);
205 				if (ib.length == 0)
206 					break;
207 				auto len = min(ib.length, maxSliceSize);
208 				if (!sink.alloc(ob, len)) {
209 					import core.exception : OutOfMemoryError;
210 					throw new OutOfMemoryError();
211 				}
212 				ob[0 .. len] = ib[0 .. len];
213 				size_t w = sink.commit(len);
214 				source.consume(w);
215 				if (w < minSliceSize)
216 					break;
217 			}
218 		}
219 	}
220 }
221 
222 ///
223 auto peekAlloc(Pipeline)(auto ref Pipeline pipeline, size_t minSliceSize = size_t.sizeof, size_t maxSliceSize = 4096)
224 	if (isPeekPipeline!Pipeline)
225 {
226 	return pipeline.pipe!(DefaultPeekAllocAdapter!(Pipeline.ElementType))(minSliceSize, maxSliceSize);
227 }
228 
229 private template DefaultPushAllocAdapter(E) {
230 	@pushSink!E @allocSource!E
231 	struct DefaultPushAllocAdapter(Sink) {
232 		Sink sink;
233 
234 		size_t push(const(E)[] buf)
235 		{
236 			E[] ob;
237 			if (!sink.alloc(ob, buf.length)) {
238 				import core.exception : OutOfMemoryError;
239 				throw new OutOfMemoryError();
240 			}
241 			ob[0 .. buf.length] = buf[];
242 			return sink.commit(buf.length);
243 		}
244 	}
245 }
246 
247 ///
248 auto pushAlloc(Pipeline)(auto ref Pipeline pipeline)
249 	if (isPushPipeline!Pipeline)
250 {
251 	alias E = Pipeline.ElementType;
252 	alias PP = DefaultPushAllocAdapter!(E);
253 	return pipeline.pipe!PP();
254 }
255 
256 private template DefaultPushPullAdapter(Buffer, E) {
257 	@pushSink!E @pullSource!E
258 	struct DefaultPushPullAdapter(alias Scheduler) {
259 		import std.algorithm : min;
260 
261 		mixin Scheduler;
262 
263 		Buffer buffer;
264 		const(E)[] pushed;
265 
266 		this()(auto ref Buffer buffer)
267 		{
268 			this.buffer = buffer;
269 		}
270 
271 		size_t push(const(E)[] buf)
272 		{
273 			if (pushed.length > 0)
274 				return 0;
275 			pushed = buf;
276 			yield();
277 			return buf.length;
278 		}
279 
280 		private E[] pullFromBuffer(E[] dest)
281 		{
282 			auto src = buffer.peek!E();
283 			auto len = min(src.length, dest.length);
284 			if (len > 0) {
285 				dest[0 .. len] = src[0 .. len];
286 				buffer.consume!E(len);
287 				return dest[len .. $];
288 			}
289 			return dest;
290 		}
291 
292 		size_t pull(E[] dest)
293 		{
294 			size_t requestedLength = dest.length;
295 			// first, give off whatever was left from this.pushed on previous pull();
296 			dest = pullFromBuffer(dest);
297 			if (dest.length == 0)
298 				return requestedLength;
299 			// if not satisfied yet, switch to source fiber till push() is called again
300 			// enough times to fill dest[]
301 			do {
302 				if (yield())
303 					break;
304 				// pushed is the slice of the original buffer passed to push() by the source.
305 				auto len = min(pushed.length, dest.length);
306 				assert(len > 0);
307 				dest[0 .. len] = pushed[0 .. len];
308 				dest = dest[len .. $];
309 				pushed = pushed[len .. $];
310 			} while (dest.length > 0);
311 
312 			// whatever's left in pushed, keep it in buffer for the next time pull() is called
313 			while (pushed.length > 0) {
314 				auto b = buffer.alloc!E(pushed.length);
315 				if (b.length == 0) {
316 					import core.exception : OutOfMemoryError;
317 					throw new OutOfMemoryError();
318 				}
319 				auto len = (b.length, pushed.length);
320 				b[0 .. len] = pushed[0 .. len];
321 				buffer.commit!E(len);
322 				pushed = pushed[len .. $];
323 			}
324 			return requestedLength - dest.length;
325 		}
326 	}
327 }
328 
329 ///
330 auto pushPull(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
331 {
332 	alias E = Pipeline.ElementType;
333 	alias PP = DefaultPushPullAdapter!(Buffer, E);
334 	return pipeline.pipe!PP(buffer);
335 }
336 
337 ///
338 auto pushPull(Pipeline)(auto ref Pipeline pipeline)
339 {
340 	import flod.buffer : movingBuffer;
341 	return pipeline.pushPull(movingBuffer());
342 }
343 
344 private template ImplementPeekConsume(E) {
345 	const(E)[] peek(size_t n)
346 	{
347 		const(E)[] result;
348 		for (;;) {
349 			result = buffer.peek!E;
350 			if (result.length >= n)
351 				break;
352 			if (yield())
353 				break;
354 		}
355 		return result;
356 	}
357 
358 	void consume(size_t n)
359 	{
360 		buffer.consume!E(n);
361 	}
362 }
363 
364 private template ImplementAllocCommit(E) {
365 	bool alloc(ref E[] buf, size_t n)
366 	{
367 		buf = buffer.alloc!E(n);
368 		if (!buf || buf.length < n)
369 			return false;
370 		return true;
371 	}
372 
373 	size_t commit(size_t n)
374 	{
375 		buffer.commit!E(n);
376 		if (yield())
377 			return 0;
378 		return n;
379 	}
380 }
381 
382 private template DefaultPushPeekAdapter(Buffer, E) {
383 	@pushSink!E @peekSource!E
384 	struct DefaultPushPeekAdapter(alias Scheduler) {
385 		import std.algorithm : min;
386 		mixin Scheduler;
387 		Buffer buffer;
388 
389 		this()(auto ref Buffer buffer)
390 		{
391 			this.buffer = buffer;
392 		}
393 
394 		size_t push(const(E)[] buf)
395 		{
396 			size_t n = buf.length;
397 			auto ob = buffer.alloc!E(n);
398 			if (ob.length < n) {
399 				import core.exception : OutOfMemoryError;
400 				throw new OutOfMemoryError();
401 			}
402 			ob[0 .. n] = buf[0 .. n];
403 			buffer.commit!E(n);
404 			if (yield())
405 				return 0;
406 			return n;
407 		}
408 
409 		mixin ImplementPeekConsume!E;
410 	}
411 }
412 
413 
414 ///
415 auto pushPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
416 {
417 	alias E = Pipeline.ElementType;
418 	alias PP = DefaultPushPeekAdapter!(Buffer, E);
419 	return pipeline.pipe!PP(buffer);
420 }
421 
422 ///
423 auto pushPeek(Pipeline)(auto ref Pipeline pipeline)
424 {
425 	import flod.buffer : movingBuffer;
426 	return pipeline.pushPeek(movingBuffer());
427 }
428 
429 private template DefaultAllocPeekAdapter(Buffer, E) {
430 	@allocSink!E @peekSource!E
431 	struct DefaultAllocPeekAdapter(alias Scheduler) {
432 		import std.algorithm : min;
433 		mixin Scheduler;
434 		Buffer buffer;
435 
436 		this()(auto ref Buffer buffer)
437 		{
438 			this.buffer = buffer;
439 		}
440 
441 		mixin ImplementAllocCommit!E;
442 		mixin ImplementPeekConsume!E;
443 	}
444 }
445 
446 ///
447 auto allocPeek(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
448 	if (isAllocPipeline!Pipeline)
449 {
450 	alias E = Pipeline.ElementType;
451 	alias PP = DefaultAllocPeekAdapter!(Buffer, E);
452 	return pipeline.pipe!PP(buffer);
453 }
454 
455 ///
456 auto allocPeek(Pipeline)(auto ref Pipeline pipeline)
457 	if (isAllocPipeline!Pipeline)
458 {
459 	import flod.buffer : movingBuffer;
460 	return pipeline.allocPeek(movingBuffer());
461 }
462 
463 private template DefaultAllocPullAdapter(Buffer, E) {
464 	@allocSink!E @pullSource!E
465 	struct DefaultAllocPullAdapter(alias Scheduler) {
466 		import std.algorithm : min;
467 		mixin Scheduler;
468 		Buffer buffer;
469 
470 		this()(auto ref Buffer buffer)
471 		{
472 			this.buffer = buffer;
473 		}
474 
475 		mixin ImplementAllocCommit!E;
476 
477 		size_t pull(E[] buf)
478 		{
479 			const(E)[] ib;
480 			for (;;) {
481 				ib = buffer.peek!E;
482 				if (ib.length >= buf.length)
483 					break;
484 				if (yield())
485 					break;
486 			}
487 			auto len = min(ib.length, buf.length);
488 			buf[0 .. len] = ib[0 .. len];
489 			buffer.consume!E(len);
490 			return len;
491 		}
492 	}
493 }
494 
495 ///
496 auto allocPull(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
497 	if (isAllocPipeline!Pipeline)
498 {
499 	alias E = Pipeline.ElementType;
500 	alias PP = DefaultAllocPullAdapter!(Buffer, E);
501 	return pipeline.pipe!PP(buffer);
502 }
503 
504 ///
505 auto allocPull(Pipeline)(auto ref Pipeline pipeline)
506 	if (isAllocPipeline!Pipeline)
507 {
508 	import flod.buffer : movingBuffer;
509 	return pipeline.allocPull(movingBuffer());
510 }
511 
512 private template DefaultAllocPushAdapter(Buffer, E) {
513 	@allocSink!E @pushSource!E
514 	struct DefaultAllocPushAdapter(Sink) {
515 		Sink sink;
516 		Buffer buffer;
517 
518 		this()(auto ref Buffer buffer)
519 		{
520 			this.buffer = buffer;
521 		}
522 
523 		bool alloc(ref E[] buf, size_t n)
524 		{
525 			buf = buffer.alloc!E(n);
526 			if (!buf || buf.length < n)
527 				return false;
528 			return true;
529 		}
530 
531 		size_t commit(size_t n)
532 		{
533 			buffer.commit!E(n);
534 			sink.push(buffer.peek!E[0 .. n]);
535 			buffer.consume!E(n);
536 			return n;
537 		}
538 	}
539 }
540 
541 ///
542 auto allocPush(Pipeline, Buffer)(auto ref Pipeline pipeline, auto ref Buffer buffer)
543 	if (isAllocPipeline!Pipeline)
544 {
545 	alias E = Pipeline.ElementType;
546 	alias PP = DefaultAllocPushAdapter!(Buffer, E);
547 	return pipeline.pipe!PP(buffer);
548 }
549 
550 ///
551 auto allocPush(Pipeline)(auto ref Pipeline pipeline)
552 	if (isAllocPipeline!Pipeline)
553 {
554 	import flod.buffer : movingBuffer;
555 	return pipeline.allocPush(movingBuffer());
556 }