r802: Remove renderframfsclient and renderfarmfsserver .h and .C from Makefile.am...
[cinelerra_cv/mob.git] / cinelerra / renderfarm.C
blob47f1529b580997e83bf8d19b189d2899a27cfcc6
1 #include "asset.h"
2 #include "brender.h"
3 #include "clip.h"
4 #include "defaults.h"
5 #include "edl.h"
6 #include "filesystem.h"
7 #include "filexml.h"
8 #include "language.h"
9 #include "mutex.h"
10 #include "packagedispatcher.h"
11 #include "preferences.h"
12 #include "render.h"
13 #include "renderfarm.h"
14 //#include "renderfarmfsserver.h"
15 #include "bctimer.h"
16 #include "transportque.h"
19 #include <arpa/inet.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <netdb.h>
23 #include <netinet/in.h>
24 #include <stdio.h>
25 #include <string.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <sys/un.h>
29 #include <unistd.h>
34 RenderFarmServer::RenderFarmServer(ArrayList<PluginServer*> *plugindb, 
35         PackageDispatcher *packages,
36         Preferences *preferences,
37         int use_local_rate,
38         int *result_return,
39         int64_t *total_return,
40         Mutex *total_return_lock,
41         Asset *default_asset,
42         EDL *edl,
43         BRender *brender)
45         this->plugindb = plugindb;
46         this->packages = packages;
47         this->preferences = preferences;
48         this->use_local_rate = use_local_rate;
49         this->result_return = result_return;
50         this->total_return = total_return;
51         this->total_return_lock = total_return_lock;
52         this->default_asset = default_asset;
53         this->edl = edl;
54         this->brender = brender;
55         client_lock = new Mutex("RenderFarmServer::client_lock");
58 RenderFarmServer::~RenderFarmServer()
60         clients.remove_all_objects();
61         delete client_lock;
64 // Open connections to clients.
65 int RenderFarmServer::start_clients()
67         int result = 0;
69         for(int i = 0; i < preferences->get_enabled_nodes() && !result; i++)
70         {
71                 client_lock->lock("RenderFarmServer::start_clients");
72                 RenderFarmServerThread *client = new RenderFarmServerThread(plugindb, 
73                         this, 
74                         i);
75                 clients.append(client);
77                 result = client->start_loop();
78                 client_lock->unlock();
79 //usleep(100000);
80 // Fails to connect all without a delay
81         }
83         return result;
86 // The render farm must wait for all the clients to finish.
87 int RenderFarmServer::wait_clients()
89 //printf("RenderFarmServer::wait_clients 1\n");
90         clients.remove_all_objects();
91 //printf("RenderFarmServer::wait_clients 2\n");
92         return 0;
106 // Waits for requests from every client.
107 // Joins when the client is finished.
108 RenderFarmServerThread::RenderFarmServerThread(ArrayList<PluginServer*> *plugindb, 
109         RenderFarmServer *server, 
110         int number)
111  : Thread()
113         this->plugindb = plugindb;
114         this->server = server;
115         this->number = number;
116         socket_fd = -1;
117         frames_per_second = 0;
118         Thread::set_synchronous(1);
123 RenderFarmServerThread::~RenderFarmServerThread()
125 //printf("RenderFarmServerThread::~RenderFarmServerThread 1 %p\n", this);
126         Thread::join();
127 //printf("RenderFarmServerThread::~RenderFarmServerThread 1\n");
128         if(socket_fd >= 0) close(socket_fd);
129 //printf("RenderFarmServerThread::~RenderFarmServerThread 2\n");
133 int RenderFarmServerThread::start_loop()
135         int result = 0;
136         char *hostname = server->preferences->get_node_hostname(number);
137 //printf("RenderFarmServerThread::start_loop 1\n");
139 // Open file for master node
140         if(hostname[0] == '/')
141         {
142                 if((socket_fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
143                 {
144                         perror(_("RenderFarmServerThread::start_loop: socket\n"));
145                         result = 1;
146                 }
147                 else
148                 {
149                         struct sockaddr_un addr;
150                         addr.sun_family = AF_FILE;
151                         strcpy(addr.sun_path, hostname);
152                         int size = (offsetof(struct sockaddr_un, sun_path) + 
153                                 strlen(hostname) + 1);
155 // The master node is always created by BRender.  Keep trying for 30 seconds.
157 #define ATTEMPT_DELAY 100000
158                         int done = 0;
159                         int attempt = 0;
160 //printf("RenderFarmServerThread::start_loop 2 %s\n", hostname);
161                         do
162                         {
163 //printf("RenderFarmServerThread::start_loop 3\n");
164                                 if(connect(socket_fd, (struct sockaddr*)&addr, size) < 0)
165                                 {
166                                         attempt++;
167                                         if(attempt > 30000000 / ATTEMPT_DELAY)
168                                         {
169 //printf("RenderFarmServerThread::start_loop 4 %s\n", hostname);
170                                                 fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"), 
171                                                         hostname, 
172                                                         strerror(errno));
173                                                 result = 1;
174                                         }
175                                         else
176                                                 usleep(ATTEMPT_DELAY);
177 //printf("RenderFarmServerThread::start_loop 5 %s\n", hostname);
178                                 }
179                                 else
180                                         done = 1;
181                         }while(!result && !done);
182 //printf("RenderFarmServerThread::start_loop 6\n");
183                 }
184         }
185         else
186 // Open socket
187         {
188                 if((socket_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
189                 {
190                         perror(_("RenderFarmServerThread::start_loop: socket"));
191                         result = 1;
192                 }
193                 else
194                 {
195 // Open port
196                         struct sockaddr_in addr;
197                         struct hostent *hostinfo;
198                         addr.sin_family = AF_INET;
199                         addr.sin_port = htons(server->preferences->get_node_port(number));
200                         hostinfo = gethostbyname(hostname);
201                         if(hostinfo == NULL)
202                 {
203                         fprintf(stderr, _("RenderFarmServerThread::start_loop: unknown host %s.\n"), 
204                                         server->preferences->get_node_hostname(number));
205                         result = 1;
206                 }
207                         else
208                         {
209                                 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;   
211                                 if(connect(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
212                                 {
213                                         fprintf(stderr, _("RenderFarmServerThread::start_loop: %s: %s\n"), 
214                                                 server->preferences->get_node_hostname(number), 
215                                                 strerror(errno));
216                                         result = 1;
217                                 }
218                         }
219                 }
220         }
221 //printf("RenderFarmServerThread::start_loop 7\n");
223         if(!result) Thread::start();
225         return result;
229 int RenderFarmServerThread::read_socket(int socket_fd, char *data, int len, int timeout)
231         int bytes_read = 0;
232         int offset = 0;
233 //timeout = 0;
235 //printf("RenderFarmServerThread::read_socket 1\n");
236         while(len > 0 && bytes_read >= 0)
237         {
238                 int result = 0;
239                 if(timeout > 0)
240                 {
241                         fd_set read_fds;
242                         struct timeval tv;
244                         FD_ZERO(&read_fds);
245                         FD_SET(socket_fd, &read_fds);
246                         tv.tv_sec = timeout;
247                         tv.tv_usec = 0;
249                         result = select(socket_fd + 1, 
250                                 &read_fds, 
251                                 0, 
252                                 0, 
253                                 &tv);
254                         FD_ZERO(&read_fds);
255 //printf("RenderFarmServerThread::read_socket 1 %d\n", result);
256                 }
257                 else
258                         result = 1;
260                 if(result)
261                 {
262                         bytes_read = read(socket_fd, data + offset, len);
263                         if(bytes_read > 0)
264                         {
265                                 len -= bytes_read;
266                                 offset += bytes_read;
267                         }
268                         else
269                         {
270 //printf("RenderFarmServerThread::read_socket got 0 len=%d\n", len);
271                                 break;
272                         }
273                 }
274                 else
275                 {
276 printf("RenderFarmServerThread::read_socket timed out. len=%d\n", len);
277                         break;
278                 }
279         }
280 //printf("RenderFarmServerThread::read_socket 2\n");
282         return offset;
285 int RenderFarmServerThread::write_socket(int socket_fd, char *data, int len, int timeout)
287         int result = 0;
288         if(timeout > 0)
289         {
290                 fd_set write_fds;
291                 struct timeval tv;
292                 FD_ZERO(&write_fds);
293                 FD_SET(socket_fd, &write_fds);
294                 tv.tv_sec = timeout;
295                 tv.tv_usec = 0;
296 //printf("RenderFarmServerThread::write_socket 1\n");
297                 result = select(socket_fd + 1, 
298                                 0, 
299                                 &write_fds, 
300                                 0, 
301                                 &tv);
302 //printf("RenderFarmServerThread::write_socket 2\n");
303                 FD_ZERO(&write_fds);
304                 if(!result)
305                 {
306 printf("RenderFarmServerThread::write_socket 1 socket timed out. len=%d\n", len);
307                         return 0;
308                 }
309         }
311         return write(socket_fd, data, len);
314 int RenderFarmServerThread::read_socket(char *data, int len, int timeout)
316         return read_socket(socket_fd, data, len, timeout);
319 int RenderFarmServerThread::write_socket(char *data, int len, int timeout)
321         return write_socket(socket_fd, data, len, timeout);
324 void RenderFarmServerThread::reallocate_buffer(int size)
326         if(buffer && buffer_allocated < size)
327         {
328                 delete [] buffer;
329                 buffer = 0;
330         }
332         if(!buffer && size)
333         {
334                 buffer = new unsigned char[size];
335                 buffer_allocated = size;
336         }
339 void RenderFarmServerThread::run()
341 // Wait for requests
342         unsigned char header[5];
343         int done = 0;
346         buffer = 0;
347         buffer_allocated = 0;
348 //      fs_server = new RenderFarmFSServer(this);
349 //      fs_server->initialize();
350         while(!done)
351         {
353 //printf("RenderFarmServerThread::run 1\n");
354 // Wait for requests.
355 // Requests consist of request ID's and accompanying buffers.
356 // Get request ID.
357                 if(read_socket(socket_fd, (char*)header, 5, -1) != 5)
358                 {
359                         done = 1;
360                         continue;
361                 }
363                 int request_id = header[0];
364                 int64_t request_size = (((u_int32_t)header[1]) << 24) |
365                                                         (((u_int32_t)header[2]) << 16) |
366                                                         (((u_int32_t)header[3]) << 8)  |
367                                                         (u_int32_t)header[4];
369 //printf("RenderFarmServerThread::run 2 %d %lld\n", request_id, request_size);
370                 reallocate_buffer(request_size);
372 // Get accompanying buffer
373                 if(read_socket((char*)buffer, request_size, RENDERFARM_TIMEOUT) != request_size)
374                 {
375                         done = 1;
376                         continue;
377                 }
379                 switch(request_id)
380                 {
381                         case RENDERFARM_PREFERENCES:
382                                 send_preferences();
383                                 break;
384                         
385                         case RENDERFARM_ASSET:
386                                 send_asset();
387                                 break;
388                         
389                         case RENDERFARM_EDL:
390                                 send_edl();
391                                 break;
392                         
393                         case RENDERFARM_PACKAGE:
394                                 send_package(buffer);
395                                 break;
396                         
397                         case RENDERFARM_PROGRESS:
398                                 set_progress(buffer);
399                                 break;
401                         case RENDERFARM_SET_RESULT:
402                                 set_result(buffer);
403                                 break;
405                         case RENDERFARM_SET_VMAP:
406                                 set_video_map(buffer);
407                                 break;
409                         case RENDERFARM_GET_RESULT:
410                                 get_result();
411                                 break;
413                         case RENDERFARM_DONE:
414                                 done = 1;
415                                 break;
418                         default:
419 //                              if(!fs_server->handle_request(request_id, request_size, (unsigned char*)buffer))
420                                 {
421                                         printf(_("RenderFarmServerThread::run: unknown request %02x\n"), request_id);
422                                 }
423                                 break;
424                 }
425 //printf("RenderFarmServerThread::run 10 %d %lld\n", request_id, request_size);
426         }
427         
428         if(buffer) delete [] buffer;
429 //      delete fs_server;
432 int RenderFarmServerThread::write_string(int socket_fd, char *string)
434         unsigned char *datagram;
435         int i, len;
436         i = 0;
438         len = strlen(string) + 1;
439         datagram = new unsigned char[len + 4];
440         STORE_INT32(len);
441         memcpy(datagram + i, string, len);
442         write_socket(socket_fd, (char*)datagram, len + 4, RENDERFARM_TIMEOUT);
443 //printf("RenderFarmServerThread::write_string %02x%02x%02x%02x\n",
444 //      datagram[0], datagram[1], datagram[2], datagram[3]);
446         delete [] datagram;
449 void RenderFarmServerThread::send_preferences()
451         Defaults defaults;
452         char *string;
454         server->preferences->save_defaults(&defaults);
455         defaults.save_string(string);
456         write_string(socket_fd, string);
458         delete [] string;
461 void RenderFarmServerThread::send_asset()
463         Defaults defaults;
464         char *string1;
466 // The asset must be sent in two segments.
467 // One segment is stored in the EDL and contains decoding information.
468 // One segment is stored in the asset and contains encoding information.
469         server->default_asset->save_defaults(&defaults, 
470                 0, 
471                 1,
472                 1,
473                 1,
474                 1,
475                 1);
476         defaults.save_string(string1);
478         FileXML file;
479         server->default_asset->write(&file, 0, 0);
480         file.terminate_string();
482         write_string(socket_fd, string1);
483         write_string(socket_fd, file.string);
484         delete [] string1;
488 void RenderFarmServerThread::send_edl()
490         FileXML file;
492 // Save the XML
493         server->edl->save_xml(plugindb,
494                 &file, 
495                 0,
496                 0,
497                 0);
498         file.terminate_string();
499 //printf("RenderFarmServerThread::send_edl\n%s\n\n", file.string);
501         write_string(socket_fd, file.string);
502 //printf("RenderFarmServerThread::send_edl 2\n");
506 void RenderFarmServerThread::send_package(unsigned char *buffer)
508         this->frames_per_second = (double)((((u_int32_t)buffer[0]) << 24) |
509                 (((u_int32_t)buffer[1]) << 16) |
510                 (((u_int32_t)buffer[2]) << 8)  |
511                 ((u_int32_t)buffer[3])) / 
512                 65536.0;
514 //printf("RenderFarmServerThread::send_package 1 %f\n", frames_per_second);
515         RenderPackage *package = 
516                 server->packages->get_package(frames_per_second, 
517                         number, 
518                         server->use_local_rate);
520 //printf("RenderFarmServerThread::send_package 2\n");
522         char datagram[BCTEXTLEN];
524 // No more packages
525         if(!package)
526         {
527 //printf("RenderFarmServerThread::send_package 1\n");
528                 datagram[0] = datagram[1] = datagram[2] = datagram[3] = 0;
529                 write_socket(datagram, 4, RENDERFARM_TIMEOUT);
530         }
531         else
532 // Encode package
533         {
534 //printf("RenderFarmServerThread::send_package 10\n");
535                 int i = 4;
536                 strcpy(&datagram[i], package->path);
537                 i += strlen(package->path);
538                 datagram[i++] = 0;
540                 STORE_INT32(package->audio_start);
541                 STORE_INT32(package->audio_end);
542                 STORE_INT32(package->video_start);
543                 STORE_INT32(package->video_end);
544                 int use_brender = (server->brender ? 1 : 0);
545                 STORE_INT32(use_brender);
547                 int len = i;
548                 i = 0;
549                 STORE_INT32(len - 4);
551                 write_socket(datagram, len, RENDERFARM_TIMEOUT);
552         }
556 void RenderFarmServerThread::set_progress(unsigned char *buffer)
558         server->total_return_lock->lock("RenderFarmServerThread::set_progress");
559         *server->total_return += (int64_t)(((u_int32_t)buffer[0]) << 24) |
560                                                                                         (((u_int32_t)buffer[1]) << 16) |
561                                                                                         (((u_int32_t)buffer[2]) << 8)  |
562                                                                                         ((u_int32_t)buffer[3]);
563         server->total_return_lock->unlock();
566 int RenderFarmServerThread::set_video_map(unsigned char *buffer)
568         if(server->brender)
569         {
570                 server->brender->set_video_map((int64_t)(((u_int32_t)buffer[0]) << 24) |
571                                                         (((u_int32_t)buffer[1]) << 16) |
572                                                         (((u_int32_t)buffer[2]) << 8)  |
573                                                         ((u_int32_t)buffer[3]),
574                                                         (int64_t)(((u_int32_t)buffer[4]) << 24) |
575                                                         (((u_int32_t)buffer[5]) << 16) |
576                                                         (((u_int32_t)buffer[6]) << 8)  |
577                                                         ((u_int32_t)buffer[7]));
578                 char return_value[1];
579                 return_value[0] = 0;
580                 write_socket(return_value, 1, RENDERFARM_TIMEOUT);
581                 return 0;
582         }
583         return 1;
587 void RenderFarmServerThread::set_result(unsigned char *buffer)
589 //printf("RenderFarmServerThread::set_result %p\n", buffer);
590         if(!*server->result_return)
591                 *server->result_return = buffer[0];
595 void RenderFarmServerThread::get_result()
597         unsigned char data[1];
598         data[0] = *server->result_return;
599         write_socket((char*)data, 1, RENDERFARM_TIMEOUT);