1 /*
2  *Copyright (C) 2018 Laurent Tréguier
3  *
4  *This file is part of DLS.
5  *
6  *DLS is free software: you can redistribute it and/or modify
7  *it under the terms of the GNU General Public License as published by
8  *the Free Software Foundation, either version 3 of the License, or
9  *(at your option) any later version.
10  *
11  *DLS is distributed in the hope that it will be useful,
12  *but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *GNU General Public License for more details.
15  *
16  *You should have received a copy of the GNU General Public License
17  *along with DLS.  If not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 module dls.util.communicator;
22 
23 import std.stdio : File;
24 
25 // Socket can't be used with a shared aliasing.
26 // However, the communicator's methods are all called either from the main
27 // thread, or from inside a synchronized block, so __gshared is ok.
28 private __gshared Communicator _communicator;
29 private __gshared File _stdin;
30 private __gshared File _stdout;
31 
32 shared static this()
33 {
34     import std.stdio : stdin, stdout;
35 
36     _stdin = stdin;
37     _stdout = stdout;
38 
39     version (Windows)
40     {
41         stdin = File("NUL", "rb");
42         stdout = File("NUL", "wb");
43     }
44     else version (Posix)
45     {
46         stdin = File("/dev/null", "rb");
47         stdout = File("/dev/null", "wb");
48     }
49 }
50 
51 shared static ~this()
52 {
53     if (_communicator !is null)
54     {
55         destroy(communicator);
56     }
57 }
58 
59 @property Communicator communicator()
60 {
61     return _communicator;
62 }
63 
64 @property void communicator(Communicator c)
65 {
66     assert(_communicator is null);
67     _communicator = c;
68 }
69 
70 interface Communicator
71 {
72     bool hasData();
73     bool hasPendingData();
74     char[] read(const size_t size);
75     void write(const char[] data);
76     void flush();
77 }
78 
79 class StdioCommunicator : Communicator
80 {
81     import std.parallelism : Task, TaskPool;
82 
83     private bool _checkPending;
84     private TaskPool _pool;
85     private Task!(readChar)* _background;
86 
87     static char readChar()
88     {
89         static char[1] buffer;
90         auto result = _stdin.rawRead(buffer);
91 
92         if (result.length > 0)
93         {
94             return result[0];
95         }
96 
97         throw new Exception("No input data");
98     }
99 
100     this(bool checkPendingData)
101     {
102         _checkPending = checkPendingData;
103 
104         if (checkPendingData)
105         {
106             _pool = new TaskPool(1);
107             _pool.isDaemon = true;
108             startBackground();
109         }
110     }
111 
112     ~this()
113     {
114         if (_checkPending)
115         {
116             _pool.stop();
117         }
118     }
119 
120     bool hasData()
121     {
122         return _stdin.isOpen && !_stdin.eof;
123     }
124 
125     bool hasPendingData()
126     {
127         try
128         {
129             return _checkPending && _background.done;
130         }
131         catch (Exception e)
132         {
133             return false;
134         }
135     }
136 
137     char[] read(const size_t size)
138     {
139         if (size == 0)
140         {
141             return [];
142         }
143 
144         static char[] buffer;
145         buffer.length = size;
146 
147         if (!_checkPending)
148         {
149             return _stdin.rawRead(buffer);
150         }
151 
152         scope(exit)
153         {
154             startBackground();
155         }
156 
157         try
158         {
159             buffer[0] = _background.yieldForce();
160         }
161         catch (Exception e)
162         {
163             return hasData() ? _stdin.rawRead(buffer) : [];
164         }
165 
166         if (size > 1)
167         {
168             buffer = buffer[0 .. _stdin.rawRead(buffer[1 .. $]).length + 1];
169         }
170 
171         return buffer;
172     }
173 
174     void write(const char[] data)
175     {
176         _stdout.rawWrite(data);
177     }
178 
179     void flush()
180     {
181         _stdout.flush();
182     }
183 
184     private void startBackground()
185     {
186         import std.parallelism : task;
187 
188         if (_checkPending && hasData())
189         {
190             _background = task!readChar;
191             _pool.put(_background);
192         }
193     }
194 }
195 
196 class SocketCommunicator : Communicator
197 {
198     import std.socket : Socket;
199 
200     private Socket _socket;
201 
202     this(ushort port)
203     {
204         import std.socket : AddressInfo, InternetAddress, SocketOption,
205             SocketOptionLevel, TcpSocket;
206 
207         _socket = new TcpSocket(new InternetAddress("localhost", port));
208         _socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
209     }
210 
211     bool hasData()
212     {
213         synchronized (_socket)
214         {
215             return _socket.isAlive;
216         }
217     }
218 
219     bool hasPendingData()
220     {
221         import std.socket : SocketFlags;
222 
223         static char[1] buffer;
224         ptrdiff_t result;
225 
226         synchronized (_socket)
227         {
228             _socket.blocking = false;
229             result = _socket.receive(buffer, SocketFlags.PEEK);
230             _socket.blocking = true;
231         }
232 
233         return result != Socket.ERROR && result > 0;
234     }
235 
236     char[] read(const size_t size)
237     {
238         static char[] buffer;
239         buffer.length = size;
240         ptrdiff_t totalBytesReceived;
241         ptrdiff_t bytesReceived;
242 
243         do
244         {
245             synchronized (_socket)
246             {
247                 bytesReceived = _socket.receive(buffer);
248             }
249 
250             if (bytesReceived != Socket.ERROR)
251             {
252                 totalBytesReceived += bytesReceived;
253             }
254             else if (bytesReceived == 0)
255             {
256                 buffer.length = totalBytesReceived;
257                 break;
258             }
259         }
260         while (bytesReceived == Socket.ERROR || totalBytesReceived < size);
261 
262         return buffer;
263     }
264 
265     void write(const char[] data)
266     {
267         ptrdiff_t totalBytesSent;
268         ptrdiff_t bytesSent;
269 
270         do
271         {
272             synchronized (_socket)
273             {
274                 bytesSent = _socket.send(data[totalBytesSent .. $]);
275             }
276 
277             if (bytesSent != Socket.ERROR)
278             {
279                 totalBytesSent += bytesSent;
280             }
281         }
282         while (bytesSent == Socket.ERROR || totalBytesSent < data.length);
283     }
284 
285     void flush()
286     {
287     }
288 }