1 /** Various buffer implementations.
2  *
3  *  Authors: $(LINK2 https://github.com/epi, Adrian Matoga)
4  *  Copyright: © 2016 Adrian Matoga
5  *  License: $(LINK2 http://www.boost.org/users/license.html, BSL-1.0).
6  */
7 module flod.buffer;
8 
9 private size_t alignUp(size_t n, size_t al)
10 {
11 	return (n + al - 1) & ~(al - 1);
12 }
13 static assert(13.alignUp(4) == 16);
14 static assert(31337.alignUp(4096) == 32768);
15 
16 private size_t goodSize(Allocator)(ref Allocator, size_t n)
17 {
18 	static if (is(typeof(allocator.goodAllocSize(n)) : size_t))
19 		return allocator.goodAllocSize(n);
20 	else
21 		return n.alignUp(size_t.sizeof);
22 }
23 
24 import std.experimental.allocator.mallocator : Mallocator;
25 
26 /// A buffer that discards all data written to it and always returns empty slice.
27 struct NullBuffer {
28 private:
29 	void[] buffer;
30 public:
31 	~this() { Mallocator.instance.deallocate(buffer); }
32 	T[] alloc(T)(size_t n)
33 	{
34 		return new T[n];
35 	}
36 	void commit(T)(size_t n) {};
37 	const(T)[] peek(T)() { return null; }
38 	void consume(T)(size_t n) {};
39 }
40 
41 /**
42 A buffer that relies on moving chunks of data in memory
43 to ensure that contiguous slices of any requested size can always be provided.
44 Params:
45  Allocator = _Allocator used for internal storage allocation.
46 */
47 struct MovingBuffer(Allocator = Mallocator) {
48 private:
49 	import core.exception : OutOfMemoryError;
50 
51 	void[] buffer;
52 	size_t peekOffset;
53 	size_t allocOffset;
54 	Allocator allocator;
55 
56 	invariant {
57 		assert(peekOffset <= allocOffset);
58 		assert(allocOffset <= buffer.length);
59 	}
60 
61 public:
62 	this()(auto ref Allocator allocator, size_t initialSize = 0)
63 	{
64 		import flod.meta : moveIfNonCopyable;
65 		this.allocator = moveIfNonCopyable(allocator);
66 		if (initialSize > 0)
67 			buffer = allocator.allocate(allocator.goodSize(initialSize));
68 	}
69 
70 	this(this)
71 	{
72 		assert(buffer == null);
73 	}
74 
75 	/// Allocates space for at least `n` new objects of type `T` to be written to the buffer.
76 	T[] alloc(T)(size_t n)
77 	{
78 		import std.experimental.allocator : reallocate;
79 		import core.stdc.string : memmove;
80 		size_t tn = T.sizeof * n;
81 		if (buffer.length - allocOffset >= tn)
82 			return cast(T[]) buffer[allocOffset .. $];
83 		memmove(buffer.ptr, buffer.ptr + peekOffset, allocOffset - peekOffset);
84 		allocOffset -= peekOffset;
85 		peekOffset = 0;
86 		size_t newSize = goodSize(allocator, allocOffset + tn);
87 		if (buffer.length < newSize)
88 			allocator.reallocate(buffer, newSize);
89 		assert(buffer.length - allocOffset >= tn); // TODO: let it return smaller chunk and the user will handle it
90 		return cast(T[]) buffer[allocOffset .. $];
91 	}
92 
93 	/// Adds first `n` objects of type `T` stored in the slice previously obtained using `alloc`.
94 	/// Does not touch the remaining part of that slice.
95 	void commit(T)(size_t n)
96 	{
97 		size_t tn = T.sizeof * n;
98 		allocOffset += tn;
99 		assert(allocOffset <= buffer.length);
100 	}
101 
102 	/// Returns a read-only slice, typed as `const(T)[]`, containing all data currently available in the buffer.
103 	const(T)[] peek(T)()
104 	{
105 		return cast(const(T)[]) buffer[peekOffset .. allocOffset];
106 	}
107 
108 	/// Removes first `n` objects of type `T` from the buffer.
109 	void consume(T)(size_t n)
110 	{
111 		size_t tn = T.sizeof * n;
112 		peekOffset += tn;
113 		assert(peekOffset <= allocOffset);
114 		if (peekOffset == buffer.length) {
115 			peekOffset = 0;
116 			allocOffset = 0;
117 		}
118 	}
119 }
120 
121 ///
122 auto movingBuffer(Allocator)(auto ref Allocator allocator)
123 {
124 	return MovingBuffer!Allocator(allocator);
125 }
126 
127 ///
128 auto movingBuffer()
129 {
130 	import std.experimental.allocator.mallocator : Mallocator;
131 	return movingBuffer(Mallocator.instance);
132 }
133 
134 version(unittest) {
135 	private void testBuffer(Buffer)(auto ref Buffer b)
136 	{
137 		import std.range : iota, array, put;
138 		import std.algorithm : copy;
139 		static assert(is(typeof(b)));
140 		assert(b.peek!uint().length == 0);
141 		b.consume!uint(0);
142 		auto chunk = b.alloc!uint(1);
143 		assert(chunk.length >= 1);
144 		assert(b.peek!uint().length == 0);
145 		chunk = b.alloc!uint(31337);
146 		assert(chunk.length >= 31337);
147 		auto arr = iota!uint(0, chunk.length).array();
148 		iota!uint(0, cast(uint) chunk.length).copy(chunk[0 .. $]);
149 		b.commit!uint(1312);
150 		assert(b.peek!uint()[] == arr[0 .. 1312]);
151 		b.commit!uint(chunk.length - 1312);
152 		assert(b.peek!uint()[] == arr[]);
153 		b.consume!uint(0);
154 		assert(b.peek!uint()[] == arr[]);
155 		b.consume!uint(15);
156 		assert(b.peek!uint()[] == arr[15 .. $]);
157 		// TODO: put more stress on the buffer
158 	}
159 }
160 
161 unittest {
162 	auto b = movingBuffer();
163 	testBuffer(b);
164 	// consume everything and check if b will reset its pointers.
165 	b.consume!uint(b.peek!uint().length);
166 	assert(b.peek!uint().length == 0);
167 	assert(b.allocOffset == 0);
168 	assert(b.peekOffset == 0);
169 }
170 
171 /**
172 A circular buffer which avoids moving data around, but instead maps the same physical memory block twice
173 into two adjacent virtual memory blocks.
174 It $(U does) move data blocks when growing the buffer.
175 */
176 struct MmappedBuffer {
177 private:
178 	enum pageSize = 4096;
179 	import flod.meta : NonCopyable;
180 	mixin NonCopyable;
181 
182 	import core.sys.posix.stdlib : mkstemp;
183 	import core.sys.posix.unistd : close, unlink, ftruncate;
184 	import core.sys.posix.sys.mman : mmap, munmap,
185 		MAP_ANON, MAP_PRIVATE, MAP_FIXED, MAP_SHARED, MAP_FAILED, PROT_WRITE, PROT_READ;
186 
187 	void[] buffer;
188 	size_t peekOffset;
189 	size_t peekableLength;
190 	int fd = -1;
191 	ulong wraps = 0;
192 
193 	@property size_t allocOffset() const pure nothrow
194 	{
195 		auto ao = peekOffset + peekableLength;
196 		if (ao <= buffer.length)
197 			return ao;
198 		return ao - buffer.length;
199 	}
200 
201 	@property size_t allocableLength() const pure nothrow { return buffer.length - peekableLength; }
202 
203 	invariant {
204 		assert(peekOffset <= buffer.length);
205 	}
206 
207 	this(size_t initialSize)
208 	{
209 		if (!createFile())
210 			return;
211 		if (initialSize)
212 			buffer = allocate(initialSize);
213 	}
214 
215 	bool createFile()()
216 	{
217 		static immutable path = "/dev/shm/flod-buffer-XXXXXX";
218 		char[path.length + 1] mutablePath = path.ptr[0 .. path.length + 1];
219 		fd = mkstemp(mutablePath.ptr);
220 		if (fd < 0)
221 			return false;
222 		if (unlink(mutablePath.ptr) != 0) {
223 			close(fd);
224 			fd = -1;
225 			return false;
226 		}
227 		return true;
228 	}
229 
230 	void[] allocate(size_t length)
231 	{
232 		length = length.alignUp(pageSize);
233 		if (fd < 0)
234 			return null;
235 		if (ftruncate(fd, length) != 0)
236 			return null;
237 
238 		// first, make sure that a contiguous virtual memory range of 2 * length bytes is available
239 		void* anon = mmap(null, length * 2, PROT_READ | PROT_WRITE, MAP_ANON | MAP_PRIVATE, -1, 0);
240 		if (anon == MAP_FAILED)
241 			return null;
242 
243 		// then map the 2 halves inside to the same range of physical memory
244 		void* p = mmap(anon, length, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
245 		if (p == MAP_FAILED) {
246 			munmap(anon, length * 2);
247 			return null;
248 		}
249 		assert(p == anon);
250 		p = mmap(anon + length, length, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
251 		if (p == MAP_FAILED) {
252 			munmap(anon, length * 2);
253 			return null;
254 		}
255 		assert(p == anon + length);
256 		return anon[0 .. length];
257 	}
258 
259 	bool reallocate(size_t length)
260 	{
261 		if (length == buffer.length)
262 			return true;
263 		auto newbuf = allocate(length);
264 		if (!newbuf)
265 			return false;
266 		newbuf.ptr[peekOffset .. peekOffset + peekableLength] = buffer.ptr[peekOffset .. peekOffset + peekableLength];
267 		if (peekOffset > allocOffset) {
268 			auto po1 = peekOffset;
269 			auto po2 = newbuf.length - buffer.length;
270 			peekOffset += newbuf.length - buffer.length;
271 			if (peekOffset >= newbuf.length)
272 				peekOffset -= newbuf.length;
273 		}
274 		deallocate(buffer);
275 		buffer = newbuf;
276 		return true;
277 	}
278 
279 	static void deallocate(ref void[] b)
280 	{
281 		if (b.ptr) {
282 			munmap(b.ptr, b.length << 1);
283 			b = null;
284 		}
285 	}
286 
287 public:
288 	~this()
289 	{
290 		deallocate(buffer);
291 		if (fd >= 0) {
292 			close(fd);
293 			fd = -1;
294 		}
295 	}
296 
297 	/// Returns a read-only slice, typed as `const(T)[]`, containing all data currently available in the buffer.
298 	const(T)[] peek(T)()
299 	{
300 		auto typed = cast(const(T*)) (buffer.ptr + peekOffset);
301 		auto count = peekableLength / T.sizeof;
302 		return typed[0 .. count];
303 	}
304 
305 	/// Removes first `n` objects of type `T` from the buffer.
306 	void consume(T)(size_t n)
307 	{
308 		size_t tn = T.sizeof * n;
309 		assert(peekableLength >= tn);
310 		peekOffset += tn;
311 		peekableLength -= tn;
312 		if (peekOffset >= buffer.length) {
313 			peekOffset -= buffer.length;
314 			wraps++;
315 		}
316 	}
317 
318 	/// Allocates space for at least `n` new objects of type `T` to be written to the buffer.
319 	T[] alloc(T)(size_t n)
320 	{
321 		auto typed = cast(T*) (buffer.ptr + allocOffset);
322 		auto count = allocableLength / T.sizeof;
323 		if (n <= count)
324 			return typed[0 .. count];
325 		// make sure at least T[n] will be available behind what's currently peekable
326 		reallocate(peekOffset + peekableLength + n * T.sizeof);
327 		typed = cast(T*) (buffer.ptr + allocOffset);
328 		count = allocableLength / T.sizeof;
329 		assert(count >= n); // TODO: let it return smaller chunk and the user will handle it
330 		return typed[0 .. count];
331 	}
332 
333 	/// Adds first `n` objects of type `T` stored in the slice previously obtained using `alloc`.
334 	/// Does not touch the remaining part of that slice.
335 	void commit(T)(size_t n)
336 	{
337 		size_t tn = T.sizeof * n;
338 		assert(tn <= allocableLength);
339 		peekableLength += tn;
340 	}
341 }
342 
343 auto mmappedBuffer(size_t initialSize = 0)
344 {
345 	return MmappedBuffer(initialSize);
346 }
347 
348 unittest {
349 	testBuffer(mmappedBuffer());
350 }