Project

General

Profile

scan_dev.c

The PBS log scanner program (to development database). cc scan_dev.c -o scan_dev -lpq - Randolph Herber, 08/18/2011 02:17 PM

 
1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <unistd.h>
4
#include <string.h>
5
#include <strings.h>
6
#include <time.h>
7
#include <sys/types.h>
8
#include <sys/stat.h>
9
#include <dirent.h>
10
#include <errno.h>
11
#include <regex.h>
12
#include <assert.h>
13
#include <sys/inotify.h>
14
#include <fcntl.h>
15
#include <libpq-fe.h>
16
#include <ctype.h>
17
#include <syslog.h>
18
#include <stdarg.h>
19
#include <signal.h>
20

    
21
#define N(x) (sizeof(x)/sizeof(*(x)))
22

    
23
#define K (1024)
24
#define M (K*K)
25
#define G (K*M)
26

    
27
#define has(field,bitfield) ((field & bitfield) == bitfield)
28

    
29
static FILE* output = 0;
30

    
31
static int doSyslog = 0;
32

    
33
static int logMask = 0;
34

    
35
static char programPIDText[16];
36

    
37
static char const * log_dir = 0;
38

    
39
static int clusterId = -1;
40
static char const * clusterIdText = 0;
41
static char const * CLUSTER = 0;
42

    
43
static void Exit(int const exitCode) {
44
  exit(exitCode);
45
}
46

    
47
static void logging(int const priority, char const * const format, ...) {
48
  va_list ap;
49

    
50
  if(output) {
51
    va_start(ap,format);
52
    (void)vfprintf(output,format,ap);
53
    va_end(ap);
54
  }
55

    
56
  if(doSyslog) {
57
    va_start(ap,format);
58
    (void)vsyslog(priority,format,ap);
59
    va_end(ap);
60
  }
61
}
62

    
63
struct log_priority {
64
  char const * name;
65
  int const value;
66
} priorities[] = {
67
  {"EMERG",         LOG_EMERG},
68
  {"ALERT",         LOG_ALERT},
69
  {"CRIT",          LOG_CRIT},
70
  {"ERR",           LOG_ERR},
71
  {"WARNING",       LOG_WARNING},
72
  {"NOTICE",        LOG_NOTICE},
73
  {"INFO",          LOG_INFO},
74
  {"DEBUG",         LOG_DEBUG},
75
};
76

    
77
int const major = LOG_EMERG | LOG_ALERT | LOG_CRIT | LOG_ERR;
78

    
79
static char const * const connectionData =
80
  "dbname=jml_dev user=herber host=ds2 password=MagicMaster";
81

    
82
static PGconn *connection = 0;
83
static PGresult *result = 0;
84

    
85
static void report(int priority,PGconn *connection,const char * const tag ) {
86
  if(tag) logging(priority,"--- %s ---\n",tag);
87
  int code;
88
  switch(code = PQtransactionStatus(connection)) {
89
  case PQTRANS_IDLE:    logging(priority,"idle\n"); break;
90
  case PQTRANS_ACTIVE:  logging(priority,"active\n"); break;
91
  case PQTRANS_INTRANS: logging(priority,"in transaction\n"); break;
92
  case PQTRANS_INERROR: logging(priority,"error\n"); break;
93
  case PQTRANS_UNKNOWN: logging(priority,"unknown\n"); break;
94
  default:              logging(priority,"code %d\n",code); break;
95
  }
96
}
97

    
98
static void command_report(
99
    int priority,
100
    const PGresult *result,const char * const tag,const char * const what) {
101
  if(tag || what) {
102
    logging(priority,"---");
103
    if(tag) logging(priority," %s",tag);
104
    if(what) logging(priority," %s",what);
105
    logging(priority," ---\n");
106
  }
107
  int code;
108
  switch(code = PQresultStatus(result)) {
109
  case PGRES_EMPTY_QUERY:    logging(priority,"empty command\n"); break;
110
  case PGRES_COMMAND_OK:     logging(priority,"command OK\n"); break;
111
  case PGRES_TUPLES_OK:      logging(priority,"command OK with results\n");
112
                             break;
113
  case PGRES_COPY_OUT:       logging(priority,"copy out started\n"); break;
114
  case PGRES_COPY_IN:        logging(priority,"copy in started\n"); break;
115
  case PGRES_BAD_RESPONSE:   logging(priority,"response not understood\n");
116
                             break;
117
  case PGRES_NONFATAL_ERROR: logging(priority,"command nonfatal error \n");
118
                             break;
119
  case PGRES_FATAL_ERROR:    logging(priority,"command fatal error\n"); break;
120
  default:                   logging(priority,"command code %d\n",code); break;
121
  }
122
  char *error_message = PQresultErrorMessage(result);
123
  if(error_message && strlen(error_message)) {
124
    logging(priority,"/%s/\n",error_message);
125
    logging(priority,"PG_DIAG_SEVERITY=%s\n",
126
      PQresultErrorField(result,PG_DIAG_SEVERITY));
127
    logging(priority,"PG_DIAG_SQLSTATE=%s\n",
128
      PQresultErrorField(result,PG_DIAG_SQLSTATE));
129
    logging(priority,"PG_DIAG_MESSAGE_DETAIL=%s\n",
130
      PQresultErrorField(result,PG_DIAG_MESSAGE_DETAIL));
131
    logging(priority,"PG_DIAG_MESSAGE_HINT=%s\n",
132
      PQresultErrorField(result,PG_DIAG_MESSAGE_HINT));
133
  }
134
}
135

    
136
static void clearResult(void) {
137
  if(result) PQclear(result);
138
  result = 0;
139
}
140

    
141
static void perform(char const * const);
142

    
143
static int checkup(
144
    PGresult * const result, char const * const tag, char const * const what) {
145
  int rows = 0;
146
  if(result) {
147
    if(PQresultStatus(result) == PGRES_TUPLES_OK) {
148
      rows = PQntuples(result);
149
    } else {
150
      command_report(LOG_ERR,result,tag,what);
151
      Exit(1);
152
    }
153
  } else {
154
    char message[256];
155
    snprintf(message,N(message),"in %s for %s",tag,what);
156
    logging(LOG_ERR,"no resultset %s\n",message);
157
    Exit(2);
158
  }
159
  return rows;
160
}
161

    
162
static void command_checkup(
163
    PGresult * const result, char const * const tag, char const * const what) {
164
  if(result) {
165
    if(PQresultStatus(result) != PGRES_COMMAND_OK) {
166
      command_report(LOG_ERR,result,tag,what);
167
      Exit(3);
168
    }
169
  } else {
170
    char message[256];
171
    snprintf(message,N(message),"in %s for %s",tag,what);
172
    logging(LOG_ERR,"no resultset %s\n",message);
173
    Exit(4);
174
  }
175
}
176

    
177
static int insert_checkup_with_transaction_restart(
178
    PGresult * const result, char const * const tag, char const * const what) {
179
  int rows = 0;
180
  if(result) {
181
    if(PQresultStatus(result) == PGRES_TUPLES_OK) {
182
      rows = PQntuples(result);
183
    } else {
184
      if((strcmp(PQresultErrorField(result,PG_DIAG_SEVERITY),"ERROR") == 0) &&
185
         (strcmp(PQresultErrorField(result,PG_DIAG_SQLSTATE),"23505") == 0)) {
186
        /* something else generated the row before we could insert it! */
187
        /* the value needed is the just inserted value. */
188
        rows = -1;
189
        /* the error above broke the transaction; restart the transaction */
190
        perform("rollback");
191
        perform("commit");
192
      } else {
193
        command_report(LOG_ERR,result,tag,what);
194
        Exit(5);
195
      }
196
    }
197
  } else {
198
    char message[256];
199
    snprintf(message,N(message),"in %s for %s",tag,what);
200
    logging(LOG_ERR,"no resultset %s\n",message);
201
    Exit(6);
202
  }
203
  return rows;
204
}
205

    
206
static int insert_checkup_without_transaction_restart(
207
    PGresult * const result, char const * const tag, char const * const what) {
208
  int rows = 0;
209
  if(result) {
210
    if(PQresultStatus(result) == PGRES_TUPLES_OK) {
211
      rows = PQntuples(result);
212
    } else {
213
      if((strcmp(PQresultErrorField(result,PG_DIAG_SEVERITY),"ERROR") == 0) &&
214
         (strcmp(PQresultErrorField(result,PG_DIAG_SQLSTATE),"23505") == 0)) {
215
        /* something else generated the row before we could insert it! */
216
        /* the value needed is the just inserted value. */
217
        rows = -1;
218
      } else {
219
        command_report(LOG_ERR,result,tag,what);
220
        Exit(7);
221
      }
222
    }
223
  } else {
224
    char message[256];
225
    snprintf(message,N(message),"in %s for %s",tag,what);
226
    logging(LOG_ERR,"no resultset %s\n",message);
227
    Exit(8);
228
  }
229
  return rows;
230
}
231

    
232
static void checkConnection(char const * const tag) {
233
  if(connection) {
234
    if(PQstatus(connection) != CONNECTION_OK) {
235
      logging(LOG_ERR,"connection failed when %s\n",tag);
236
      report(LOG_ERR,connection,0);
237
      Exit(9);
238
    }
239
  } else {
240
    logging(LOG_ERR,"no connection when %s\n",tag);
241
    Exit(10);
242
  }
243
}
244

    
245
static void perform(char const * const command) {
246
  char tag[128];
247
  snprintf(tag,N(tag),"perform(\"%s\")",command);
248
  int rows = 0;
249
  checkConnection(tag);
250
  clearResult();
251
  result = PQexecParams(connection,command,0,0,0,0,0,0);
252
  command_checkup(result,tag,"execution");
253
  clearResult();
254
}
255

    
256
/* debugging code; normally, not used */
257
static void dumpResult(
258
    int priority, PGresult *result,
259
    char const * const tag,char const * const what) {
260
  char buffer[8192];
261
  if(tag) logging(priority,"--- %s %s ---\n",tag,what);
262
  if(result) {
263
    if(PQresultStatus(result) == PGRES_TUPLES_OK) {
264
      int row, rows = PQntuples(result);
265
      int column, columns = PQnfields(result);
266
      if(rows) {
267
        for(row = 0; row < rows; ++row) {
268
          buffer[0] = '\0';
269
          for(column = 0; column < columns; ++column) {
270
            if(column) strncat(buffer," ",N(buffer));
271
            if(PQgetisnull(result,row,column)) {
272
              strncat(buffer,"null",N(buffer));
273
            } else {
274
              strncat(buffer,"[",N(buffer));
275
              strncat(buffer,PQgetvalue(result,row,column),N(buffer));
276
              strncat(buffer,"]",N(buffer));
277
            }
278
          }
279
          logging(priority,"%s\n",buffer);
280
        }
281
      } else {
282
        logging(priority,"no rows\n");
283
      }
284
    } else {
285
      command_report(priority,result,0,0);
286
    }
287
  } else {
288
    logging(priority,"no resultset\n");
289
  }
290
}
291

    
292
static char const * const quoteText(char const * const text) {
293
  /* this routine passes ownership of the quoted string to the caller */
294
  int single_quotes = 0;
295
  char const *c;
296
  for(c = text; *c; ++c) if(*c == '\'') ++single_quotes;
297
  char *new = (char *)malloc(strlen(text)+3+single_quotes);
298
  if(!new) {
299
    logging(LOG_ERR,"unable to quote \"%s\"\n",text);
300
    Exit(11);
301
  }
302
  char *dest = new;
303
  *(dest++) = '\'';
304
  for(c = text; *c; ++c) {
305
    if(*c == '\'') *(dest++) = '\'';
306
    *(dest++) = *c;
307
  }
308
  *(dest++) = '\'';
309
  *(dest++) = '\0';
310
  return new;
311
}
312
/* From an examination of torque-2.5.6 server/accounting.c
313
   in acct_record, if a record to be written is received
314
   in a new day, then the old file is (f)closed and the
315
   new file is (f)opened. Fclose does a flush of buffers.
316

317
   The flush of buffers is sufficient to give the buffers
318
   to the kernel which will make the buffers available to
319
   other applications on the same node.
320

321
   in acct_open, the FILE* is set unbuffered, which means
322
   that each log record is flushed as soon as its new-line
323
   is recorded.
324

325
   localtime(3) is used to determine the current date.
326
*/
327

    
328
static char line[2*M] = {0};
329

    
330
typedef struct logNode {
331
  struct logNode *next;
332
  off_t knownEnd;
333
  FILE *input;
334
  char *logName;
335
} logElement, *logPointer ;
336

    
337
static logPointer logList = 0;
338

    
339
static logPointer insertLogFile(char *logName) {
340
  logName = strdup(logName);
341
  if(!logName) {
342
    logging(LOG_ERR,"unable to avail log name storage\n");
343
    Exit(12);
344
  }
345
  logPointer *previous = &logList;
346
  logPointer location = logList;
347
  int condition = 1;
348
  while(location && (condition = strcmp(location->logName,logName)) < 0) {
349
    previous = &(location->next);
350
    location = location->next;
351
  }
352
  if(location && condition == 0) return location;
353
  if(location == 0 || condition > 0) {
354
    logPointer newElement = (logPointer)malloc(sizeof(logElement));
355
    if(!newElement) {
356
      logging(LOG_ERR,"unable to avail log name node storage\n");
357
      Exit(13);
358
    }
359
    newElement->next = location;
360
    newElement->logName = logName;
361
    newElement->knownEnd = 0;
362
    newElement->input = 0;
363
    *previous = newElement;
364
    /*  Check assertion that the list is in order.
365
        if(logList) {
366
          logPointer log = logList;
367
          while(log->next) {
368
            if(strcmp(log->logName,log->next->logName) >= 0) {
369
              logging(LOG_ERR,"out of order, %s vs %s\n",
370
                log->logName,log->next->logName);
371
              Exit(14);
372
            }
373
            log = log->next;
374
          }
375
        }
376
    */
377
    return newElement;
378
  }
379
  logging(LOG_ERR,"insertLogFile(%s) location=%#p, condition=%d\n",
380
          logName,location,condition);
381
  Exit(15);
382
}
383

    
384
static void logClose(logPointer log) {
385
  char tag[128];
386
  snprintf(tag,N(tag),"logClose(\"%s\")",log->logName);
387
  if(log->input) {
388
    fclose(log->input);
389
    log->input = 0;
390
  }
391
  checkConnection(tag);
392
  perform("begin");
393
  char const *parameters[] = {log->logName,clusterIdText};
394
  clearResult();
395
  result = PQexecParams(
396
    connection,
397
    "select active from log_file_positions "
398
    "where filename = $1 and cluster_id = $2 for update",
399
    N(parameters),0,(const char * const * const)parameters,0,0,0);
400
  int rows = checkup(result,tag,"initial select");
401
  if(rows > 0) {
402
    char const * const parameters[] = {log->logName,clusterIdText};
403
    clearResult();
404
    result = PQexecParams(connection,
405
      "update log_file_positions set active = 0 "
406
      "where filename = $1 and cluster_id = $2",
407
      N(parameters),0,(const char * const * const)parameters,0,0,0);
408
    command_checkup(result,tag,"update");
409
    clearResult();
410
  }
411
  perform("commit");
412
}
413

    
414
static void deleteLogFile(logPointer log) {
415
  logPointer *previous = &logList;
416
  logPointer location = logList;
417
  while(location != log) {
418
    previous = &(location->next);
419
    location = location->next;
420
  }
421
  if(location == log) {
422
    *previous = location->next;
423
    if(location->input) logClose(location);
424
    free(location->logName);
425
    free(location);
426
  }
427
}
428

    
429
static void deleteLogFileByName(char const * const logName) {
430
  logPointer log, next;
431
  for(log = logList; log; log = next) {
432
    next = log->next;
433
    if(strcmp(log->logName,logName) == 0) {
434
      deleteLogFile(log);
435
      return;
436
    }
437
  }
438
}
439

    
440
static long long getLogPosition(logPointer log,int * const active) {
441
  char tag[128];
442
  snprintf(tag,N(tag),"getLogPosition(\"%s\")",log->logName);
443
  long long size = 0;
444
  checkConnection(tag);
445
  char const *parameters[] = {log->logName,clusterIdText};
446
  perform("begin");
447
  clearResult();
448
  result = PQexecParams(
449
    connection,
450
    "select size, active from log_file_positions "
451
    "where filename = $1 and cluster_id = $2 for update",
452
    N(parameters),0,(const char * const * const)parameters,0,0,0);
453
  int rows = checkup(result,tag,"initial select");
454
  if(rows < 1) {
455
    clearResult();
456
    result = PQexecParams(
457
      connection,
458
      "insert into log_file_positions(filename,size,cluster_id,active) "
459
      "values($1,0,$2,0) "
460
      "returning size, active",
461
      N(parameters),0,(const char * const * const)parameters,0,0,0);
462
    rows = insert_checkup_with_transaction_restart(result,tag,"insert");
463
    if(rows == -1) {
464
      clearResult();
465
      result = PQexecParams(
466
        connection,
467
        "select size, active from log_file_positions "
468
        "where filename = $1 and cluster_id = $2 for update",
469
        N(parameters),0,(const char * const * const)parameters,0,0,0);
470
      rows = checkup(result,tag,"second select");
471
      if(rows < 1) {
472
        logging(LOG_ERR,"%s second select returned empty!\n",tag);
473
        perform("rollback");
474
        Exit(16);
475
      }
476
    }
477
    if(rows < 1) {
478
      logging(LOG_ERR,"%s \"good\" insert returned empty!\n",tag);
479
      perform("rollback");
480
      Exit(17);
481
    }
482
  }
483
  if(result) {
484
    char *v = PQgetvalue(result,0,0);
485
    char *c = 0;
486
    if(v) {
487
      size = strtoll(v,&c,0);
488
      if(*c) {
489
        logging(LOG_ERR,"%s nonnumeric size (%s)\n",v);
490
        perform("rollback");
491
        Exit(18);
492
      }
493
    } else {
494
      logging(LOG_ERR,"%s no size (%s)\n",v);
495
      perform("rollback");
496
      Exit(19);
497
    }
498
    v = PQgetvalue(result,0,1);
499
    if(v) {
500
      *active = (int)strtoll(v,&c,0);
501
      if(*c) {
502
        logging(LOG_ERR,"%s nonnumeric pid (%s)\n",v);
503
        perform("rollback");
504
        Exit(20);
505
      }
506
    } else {
507
      logging(LOG_ERR,"%s no pid (%s)\n",v);
508
      perform("rollback");
509
      Exit(21);
510
    }
511
    /* if the process no longer exists, then ignore it */
512
    if(*active && kill(*active,0) && errno == ESRCH) *active = 0;
513
    clearResult();
514
    char command[512];
515
    if(*active == 0) {
516
      strncpy(command,"update log_file_positions set active = ",N(command));
517
      strncat(command,programPIDText,N(command));
518
      strncat(command,"where filename = ",N(command));
519
      char const * const text = quoteText(log->logName);
520
      strncat(command,text,N(command));
521
      free((void *)text);
522
      strncat(command," and cluster_id = ",N(command));
523
      strncat(command,clusterIdText,N(command));
524
      result = PQexecParams(connection,command,0,0,0,0,0,0);
525
      command_checkup(result,tag,"update");
526
    }
527
  }
528
  perform("commit");
529
  clearResult();
530
  return size;
531
}
532

    
533
static int logOpen(logPointer log,int *const active) {
534
  char fileNameBuffer[8192];
535
  strncpy(fileNameBuffer,log_dir,N(fileNameBuffer));
536
  strncat(fileNameBuffer,"/",N(fileNameBuffer));
537
  strncat(fileNameBuffer,log->logName,N(fileNameBuffer));
538
  if(fileNameBuffer[N(fileNameBuffer)-1]) {
539
    logging(LOG_ERR,
540
            "unable to build absolute log file name.\n  reason: no memory\n");
541
    Exit(22);
542
  }
543
  struct stat logStat;
544
  if(stat(fileNameBuffer,&logStat)) {
545
    logging(LOG_DEBUG,"file \"%s\" is skipped as not statable\n  reason: %s\n",
546
            log->logName,strerror(errno));
547
    deleteLogFile(log);
548
  } else if(S_ISREG(logStat.st_mode)) {
549
    if(logStat.st_size < log->knownEnd) {
550
      logging(LOG_DEBUG,"%s compressed or shrunk (%lld < %lld)\n",
551
        log->logName,(long long)logStat.st_size,(long long)log->knownEnd);
552
      return 1;
553
    }
554
  } else {
555
    logging(LOG_DEBUG,"\"%s\" is not a regular file\n");
556
    deleteLogFile(log);
557
    return 1;
558
  }
559
  if(!(log->input)) log->input = fopen(fileNameBuffer,"r");
560
  log->knownEnd = getLogPosition(log,active);
561
  if(*active) {
562
    logClose(log);
563
    return 1;
564
  }
565
  return log->input ? 0 : 1;
566
}
567

    
568
static void recordLogPosition(char const * const logName,long long const size) {
569
  char tag[128];
570
  snprintf(tag,N(tag),"recordLogPosition(\"%s\",%lld)",logName,size);
571
  char sizeText[64];
572
  snprintf(sizeText,N(sizeText),"%lld",size);
573
  char const * const parameters[] = {sizeText,logName,clusterIdText};
574
  checkConnection(tag);
575
  clearResult();
576
  result = PQexecParams(connection,
577
    "update log_file_positions set size = $1 "
578
    "where filename = $2 and cluster_id = $3",
579
    N(parameters),0,(const char * const * const)parameters,0,0,0);
580
  command_checkup(result,tag,"update");
581
  clearResult();
582
}
583

    
584
static logPointer findLogFile(char const * const logName) {
585
  logPointer location = logList;
586
  int condition = 1;
587
  while(location &&
588
    (condition = strcmp(location->logName,logName)) < 0) {
589
    location = location->next;
590
  }
591
  if(location && condition == 0) {
592
    return location;
593
  }
594
  return 0;
595
}
596

    
597
static regex_t logPattern;
598
static int logPatternFree = 0;
599

    
600
static inotifyFd = -1;
601

    
602
static struct maskCode {char *name; int code;} maskTable[] = {
603
  {"IN_ACCESS",IN_ACCESS},
604
  {"IN_ATTRIB",IN_ATTRIB},
605
  {"IN_CLOSE_WRITE",IN_CLOSE_WRITE},
606
  {"IN_CLOSE_NOWRITE",IN_CLOSE_NOWRITE},
607
  {"IN_CREATE",IN_CREATE},
608
  {"IN_DELETE",IN_DELETE},
609
  {"IN_DELETE_SELF",IN_DELETE_SELF},
610
  {"IN_MODIFY",IN_MODIFY},
611
  {"IN_MOVE_SELF",IN_MOVE_SELF},
612
  {"IN_MOVED_FROM",IN_MOVED_FROM},
613
  {"IN_MOVED_TO",IN_MOVED_TO},
614
  {"IN_OPEN",IN_OPEN},
615
  {"IN_IGNORED",IN_IGNORED},
616
  {"IN_ISDIR",IN_ISDIR},
617
  {"IN_Q_OVERFLOW",IN_Q_OVERFLOW},
618
  {"IN_UNMOUNT",IN_UNMOUNT},
619
};
620

    
621
static showMask(int mask,FILE *destination) {
622
  int first = 1;
623
  int i;
624
  for(i = 0; i < N(maskTable); ++i) {
625
    if(has(mask,maskTable[i].code)) {
626
      if(first) {
627
        first = 0;
628
      } else {
629
        fputs("|",destination);
630
      }
631
      fputs(maskTable[i].name,destination);
632
    }
633
  }
634
}
635

    
636
static char *makeString(char const * const start,char const * const end) {
637
  if(!(start && end)) {
638
    logging(LOG_ERR,"invalid string ends!\n");
639
    Exit(23);
640
  }
641
  int length = end - start;
642
  char *string = (char *)malloc(length+1);
643
  if(string) {
644
    memcpy(string,start,length);
645
    string[length] = '\0';
646
  } else {
647
    logging(LOG_ERR,"unable to make a string!\n");
648
    Exit(24);
649
  }
650
  return string;
651
}
652

    
653
/* A node name might not contain the cluster name! */
654
/* The clusterId from the job identification will be used instead. */
655
/* Log files (presently) are pure by job identification. (Aug 2011) */
656
static void getClusterId(char const * const cluster) {
657
  clearResult();
658
  char tag[100];
659
  char *work = strdup(cluster);
660
  if(!work) {
661
    logging(LOG_ERR,"unable to get cluster name storage\n");
662
    Exit(25);
663
  }
664
  { char *c;
665
    for(c = work; *c; ++c) if(isalpha(*c)) *c = toupper(*c);
666
  }
667
  if(CLUSTER && (strcmp(work,CLUSTER) == 0)) {
668
    free(work);
669
    return;
670
  }
671
  free((void *)CLUSTER);
672
  CLUSTER = work;
673
  snprintf(tag,N(tag),"querying clusters for name=%s",CLUSTER);
674
  char const * const parameters[] = {CLUSTER};
675
  checkConnection(tag);
676
  clearResult();
677
  result = PQexecParams(
678
    connection,
679
    "select id "
680
    "from clusters "
681
    "where name = $1",
682
    N(parameters),0,(const char * const * const)parameters,0,0,0);
683
  int rows = checkup(result,tag,"initial select");
684
  if(rows < 1) {
685
    clearResult();
686
    result = PQexecParams(
687
      connection,
688
      "insert into clusters(name) values($1) returning id",
689
      N(parameters),0,(const char * const * const)parameters,0,0,0);
690
    rows = insert_checkup_without_transaction_restart(result,tag,"insert");
691
    if(rows == -1) {
692
      clearResult();
693
      result = PQexecParams(
694
        connection,
695
        "select id "
696
        "from clusters "
697
        "where name = $1",
698
        N(parameters),0,(const char * const * const)parameters,0,0,0);
699
      rows = checkup(result,tag,"second select");
700
      if(rows < 1) {
701
        logging(LOG_ERR,
702
                "second 'clusters' select for %s returned empty!\n",
703
                CLUSTER);
704
        Exit(26);
705
      }
706
    }
707
    if(rows < 1) {
708
      logging(LOG_ERR,"\"good\" insert for %s returned empty!\n",CLUSTER);
709
      Exit(27);
710
    }
711
  }
712
  if(result) {
713
    clusterIdText = strdup(PQgetvalue(result,0,0));
714
    if(!clusterIdText) {
715
      logging(LOG_ERR,"unable to get cluster id text storage\n");
716
      Exit(28);
717
    }
718
    clusterId = atoi(clusterIdText);
719
  }
720
  clearResult();
721
}
722

    
723
typedef struct detailNameNode {
724
  struct detailNameNode *next;
725
  char *detailName;
726
  int  id;
727
} detailNameElement, *detailNamePointer ;
728

    
729
static detailNamePointer detailNames = 0;
730

    
731
static detailNamePointer insertDetailName(char const * const detailName) {
732
  detailNamePointer *previous = &detailNames;
733
  detailNamePointer location = detailNames;
734
  int condition = 1;
735
  while(location && (condition = strcmp(location->detailName,detailName)) < 0) {
736
    previous = &(location->next);
737
    location = location->next;
738
  }
739
  if(location && condition == 0) return location;
740
  if(location == 0 || condition > 0) {
741
    detailNamePointer newElement =
742
      (detailNamePointer)malloc(sizeof(detailNameElement));
743
    if(!newElement) {
744
      logging(LOG_ERR,"unable to avail detail name node storage\n");
745
      Exit(30);
746
    }
747
    newElement->next = location;
748
    newElement->detailName = strdup(detailName);
749
    if(!newElement->detailName) {
750
      logging(LOG_ERR,"unable to avail detail name storage\n");
751
      Exit(29);
752
    }
753
    *previous = newElement;
754
    /*  Check assertion that the list is in order.
755
        if(detailNames) {
756
          detailNamePointer element = detailNames;
757
          while(element->next) {
758
            if(strcmp(element->detailName,element->next->detailName) >= 0) {
759
              logging(LOG_ERR,"out of order, %s vs %s\n",
760
                element->detailName,element->next->detailName);
761
              Exit(31);
762
            }
763
            element = element->next;
764
          }
765
        }
766
    */
767
    return newElement;
768
  }
769
  logging(LOG_ERR,"insertDetailName(%s) location=%#p, condition=%d\n",
770
          detailName,location,condition);
771
  Exit(32);
772
}
773

    
774
static int getDetailNameId(char const * const name) {
775
  if(detailNames) {
776
    detailNamePointer current;
777
    for(current = detailNames; current; current = current -> next) {
778
      int result = strcmp(current->detailName,name);
779
      if(result > 0) break;
780
      if(result == 0) return current->id;
781
    }
782
  }
783
  int nameId = 0;
784
  clearResult();
785
  char tag[100];
786
  snprintf(tag,N(tag),"querying detail names for name=%s",name);
787
  char const * const parameters[] = {name};
788
  checkConnection(tag);
789
  clearResult();
790
  result = PQexecParams(
791
    connection,
792
    "select id "
793
    "from job_detail_names "
794
    "where name = $1",
795
    N(parameters),0,(const char * const * const)parameters,0,0,0);
796
  int rows = checkup(result,tag,"initial select");
797
  if(rows < 1) {
798
    clearResult();
799
    result = PQexecParams(
800
      connection,
801
      "insert into job_detail_names(name) values($1) returning id",
802
      N(parameters),0,(const char * const * const)parameters,0,0,0);
803
    rows = insert_checkup_without_transaction_restart(result,tag,"insert");
804
    if(rows == -1) {
805
      clearResult();
806
      result = PQexecParams(
807
        connection,
808
        "select id "
809
        "from job_detail_names "
810
        "where name = $1",
811
        N(parameters),0,(const char * const * const)parameters,0,0,0);
812
      rows = checkup(result,tag,"second select");
813
      if(rows < 1) {
814
        logging(LOG_ERR,
815
                "second 'names' select for %s returned empty!\n",
816
                CLUSTER);
817
        Exit(33);
818
      }
819
    }
820
    if(rows < 1) {
821
      logging(LOG_ERR,"\"good\" insert for %s returned empty!\n",name);
822
      Exit(34);
823
    }
824
  }
825
  if(result) {
826
    nameId = atoi(PQgetvalue(result,0,0));
827
  }
828
  clearResult();
829
  insertDetailName(name)->id = nameId;
830
  return nameId;
831
}
832

    
833
static int getNodeId(char const * const node) {
834
  int nodeId = 0;
835
  clearResult();
836
  char tag[100];
837
  snprintf(tag,N(tag),"querying nodes for name=%s",node);
838
  char const * const parameters[] = {node};
839
  checkConnection(tag);
840
  clearResult();
841
  result = PQexecParams(
842
    connection,
843
    "select id "
844
    "from nodes "
845
    "where name = $1",
846
    N(parameters),0,(const char * const * const)parameters,0,0,0);
847
  int rows = checkup(result,tag,"initial select");
848
  if(rows < 1) {
849
    clearResult();
850
    /* A node name might not contain the cluster name! */
851
    /* The clusterId from the job identification will be used instead. */
852
    /* Log files (presently) are pure by job identification. (Aug 2011) */
853
    char const * const parametersInsert[] = {clusterIdText,node};
854
    result = PQexecParams(
855
      connection,
856
      "insert into nodes(cluster_id,name) values($1,$2) returning id",
857
      N(parametersInsert),0,(const char * const * const)parametersInsert,0,0,0);
858
    rows = insert_checkup_without_transaction_restart(result,tag,"insert");
859
    if(rows == -1) {
860
      clearResult();
861
      result = PQexecParams(
862
        connection,
863
        "select id "
864
        "from nodes "
865
        "where name = $1",
866
        N(parameters),0,(const char * const * const)parameters,0,0,0);
867
      rows = checkup(result,tag,"second select");
868
      if(rows < 1) {
869
        logging(LOG_ERR,
870
                "second 'nodes' select for %s returned empty!\n",
871
                CLUSTER);
872
        Exit(35);
873
      }
874
    }
875
    if(rows < 1) {
876
      logging(LOG_ERR,"\"good\" insert for %s returned empty!\n",node);
877
      Exit(36);
878
    }
879
  }
880
  if(result) {
881
    nodeId = atoi(PQgetvalue(result,0,0));
882
  }
883
  clearResult();
884
  return nodeId;
885
}
886

    
887
static char* memoJobNumber = 0;
888
static char* memoJobManager = 0;
889
static int memoJobId = 0;
890

    
891
static int getJobId(
892
    char const * const jobNumber, char const * const jobManager) {
893
  if(!(memoJobManager && strcmp(memoJobManager,jobNumber) == 0 &&
894
       memoJobNumber && strcmp(memoJobNumber,jobManager) == 0)) {
895
    clearResult();
896
    char tag[128];
897
    snprintf(tag,N(tag),"getJobId(%s,%s)",jobNumber,jobManager);
898
    char *number = strdup(jobNumber);
899
    char *element = 0;
900
    if(!number) {
901
      logging(LOG_ERR,"unable to avail working copy of job number storage\n");
902
      Exit(37);
903
    }
904
    char *begin = index(number,'[');
905
    if(begin) {
906
      char *end = index(begin+1,']');
907
      if(end) {
908
        begin[0] = '\0';
909
        end[0] = '\0';
910
        element = begin+1;
911
      }
912
    }
913
    if(element && (strlen(element) == 0)) element = 0;
914
    int rows;
915
    checkConnection(tag);
916
    clearResult();
917
    if(element) {
918
      char const * const parameters[] = {jobManager,number,element};
919
      result = PQexecParams(
920
        connection,
921
        "select id "
922
        "from jobs "
923
        "where job_manager = $1 and job_number = $2 and job_element = $3",
924
        N(parameters),0,(const char * const * const)parameters,0,0,0);
925
    } else {
926
      char const * const parameters[] = {jobManager,number};
927
      result = PQexecParams(
928
        connection,
929
        "select id "
930
        "from jobs "
931
        "where job_manager = $1 and job_number = $2 and job_element is null",
932
        N(parameters),0,(const char * const * const)parameters,0,0,0);
933
    }
934
    rows = checkup(result,tag,"initial select");
935
    if(rows < 1) {
936
      char clusterIdText[32];
937
      snprintf(clusterIdText,N(clusterIdText),"%d",clusterId);
938
      clearResult();
939
      if(element) {
940
        char const * const insertation[] =
941
          {jobManager,number,element,clusterIdText};
942
        result = PQexecParams(
943
          connection,
944
          "insert into jobs(job_manager,job_number,job_element,cluster_id) "
945
          "values($1,$2,$3,$4) returning id",
946
          N(insertation),0,(const char * const * const)insertation,0,0,0);
947
      } else {
948
        char const * const insertation[] = {jobManager,number,clusterIdText};
949
        result = PQexecParams(
950
          connection,
951
          "insert into jobs(job_manager,job_number,job_element,cluster_id) "
952
          "values($1,$2,null,$3) returning id",
953
          N(insertation),0,(const char * const * const)insertation,0,0,0);
954
      }
955
      rows = insert_checkup_without_transaction_restart(result,tag,"insert");
956
      if(rows == -1) {
957
        clearResult();
958
        if(element) {
959
          char const * const parameters[] = {jobManager,number,element};
960
          result = PQexecParams(
961
            connection,
962
            "select id "
963
            "from jobs "
964
            "where job_manager = $1 and job_number = $2 and job_element = $3",
965
            N(parameters),0,(const char * const * const)parameters,0,0,0);
966
        } else {
967
          char const * const parameters[] = {jobManager,number};
968
          result = PQexecParams(
969
            connection,
970
            "select id "
971
            "from jobs "
972
            "where job_manager = $1 "
973
              "and job_number = $2 "
974
              "and job_element is null",
975
            N(parameters),0,(const char * const * const)parameters,0,0,0);
976
        }
977
      }
978
      if(rows < 1) {
979
        logging(LOG_ERR,"\"good\" insert for %s.%s returned empty!\n",
980
                jobNumber,jobManager);
981
        Exit(38);
982
      }
983
    }
984
    if(result) {
985
      if(memoJobManager) free(memoJobManager);
986
      memoJobManager = strdup(jobManager);
987
      if(memoJobNumber) free(memoJobNumber);
988
      memoJobNumber = strdup(jobNumber);
989
      if(!(memoJobManager && memoJobNumber)) {
990
        logging(LOG_ERR,"unable to memoize job name \"%s.%s\"\n",
991
          jobNumber,jobManager);
992
        Exit(39);
993
      }
994
      memoJobId = atoi(PQgetvalue(result,0,0));
995
    }
996
    clearResult();
997
    free(number);
998
  }
999
  return memoJobId;
1000
}
1001

    
1002
static struct states { char * code; int id; } *jobStates = 0;
1003
static int jobStatesSize = 0;
1004

    
1005
static int getJobStatusId(char const * const code) {
1006
  if(!jobStates) {
1007
    int rows = 0;
1008
    checkConnection("getJobStatusId");
1009
    clearResult();
1010
    result = PQexecParams(
1011
      connection,
1012
      "select code, id from valid_job_states order by id",
1013
      0,0,0,0,0,0);
1014
    rows = checkup(result,"getJobStatusId","initial select");
1015
    if(rows == 0) {
1016
      logging(LOG_ERR,"valid_job_states table is empty.  Seed database!\n");
1017
      Exit(40);
1018
    }
1019
    jobStatesSize = rows;
1020
    jobStates = (struct states *)calloc(rows,sizeof(struct states));
1021
    int row;
1022
    for(row = 0; row < rows; ++row) {
1023
      jobStates[row].code = strdup(PQgetvalue(result,row,0));
1024
      if(!(jobStates[row].code)) {
1025
        logging(LOG_ERR,"unable to avail jobStates[row].code storage\n");
1026
        Exit(41);
1027
      }
1028
      jobStates[row].id = atoi(PQgetvalue(result,row,1));
1029
    }
1030
    clearResult();
1031
  }
1032
  int row;
1033
  for(row = 0; row < jobStatesSize; ++row) {
1034
    if(strcmp(jobStates[row].code,code) == 0) {
1035
      return jobStates[row].id;
1036
    }
1037
  }
1038
  logging(LOG_ERR,"unable to avail job status code '%s'\n",code);
1039
  Exit(42);
1040
}
1041

    
1042
/* Was hit by a "birthday surprise."  Log time and log file position
1043
   is not sufficient to make job logs unique!  Job id is required.
1044
*/
1045
static int getJobLogId(
1046
    char const * const jobIdText, int statusId, char const * const logTime,
1047
    long long const size, int *duplicate) {
1048
  clearResult();
1049
  *duplicate = 0;
1050
  char statusIdText[32];
1051
  snprintf(statusIdText,N(statusIdText),"%lld",(long long)statusId);
1052
  char sizeText[40];
1053
  snprintf(sizeText,N(sizeText),"%lld",size);
1054
  char tag[128];
1055
  snprintf(tag,N(tag),"getJobLogId(%s,%s,%s,%s,%s)",
1056
           jobIdText,statusIdText,logTime,sizeText);
1057
  char const * const parameters[] = {logTime,sizeText,jobIdText};
1058
  int rows = 0;
1059
  checkConnection(tag);
1060
  /* to_timestamp has to be used; to_date gives an always false result! */
1061
  clearResult();
1062
  result = PQexecParams(
1063
    connection,
1064
    "select id "
1065
    "from job_logs "
1066
    "where log_time = to_timestamp($1,'MM/DD/YYYY HH24:MI:SS') "
1067
      "and log_size = $2"
1068
      "and job_id = $3",
1069
    N(parameters),0,(const char * const * const)parameters,0,0,0);
1070
  rows = checkup(result,tag,"initial select");
1071
  if(rows > 1) {
1072
    logging(LOG_ERR,"%s log record is multiple (%d)\n",tag,rows);
1073
    Exit(43);
1074
  } else if(rows == 1) {
1075
    logging(LOG_DEBUG,"%s log record is duplicate\n",tag);
1076
    *duplicate = 1;
1077
    return -1;
1078
  } else {
1079
    clearResult();
1080
    char const * const insertation[] =
1081
      {jobIdText,statusIdText,logTime,sizeText};
1082
    clearResult();
1083
    result = PQexecParams(
1084
      connection,
1085
      "insert into job_logs(job_id,valid_job_state_id,log_time,log_size) "
1086
      "values($1,$2,$3,$4) returning id",
1087
      N(insertation),0,(const char * const * const)insertation,0,0,0);
1088
    rows = insert_checkup_without_transaction_restart(result,tag,"insert");
1089
    if(rows == -1) {
1090
      /* to_timestamp has to be used; to_date gives an always false result! */
1091
      clearResult();
1092
      result = PQexecParams(
1093
        connection,
1094
        "select id "
1095
        "from job_logs "
1096
        "where log_time = to_timestamp($1,'MM/DD/YYYY HH24:MI:SS') "
1097
          "and log_size = $2"
1098
          "and job_id = $3",
1099
        N(parameters),0,(const char * const * const)parameters,0,0,0);
1100
      rows = checkup(result,tag,"second select");
1101
      if(rows < 1) {
1102
        logging(LOG_ERR,"second 'job_logs' select for %s returned empty!\n",
1103
                tag);
1104
        Exit(44);
1105
      }
1106
    }
1107
  }
1108
  int jobLogId = atoi(PQgetvalue(result,0,0));
1109
  clearResult();
1110
  return jobLogId;
1111
}
1112

    
1113
static void insertJobDetail(
1114
    char const * const jobLogIdText,
1115
    char const * const name,char const * const field) {
1116
  clearResult();
1117
  char tag[128];
1118
  snprintf(tag,N(tag),"insertJobDetail(%s,%s)",jobLogIdText,name);
1119
  char detailNameId[16];
1120
  snprintf(detailNameId,N(detailNameId),"%d",getDetailNameId(name));
1121
  checkConnection(tag);
1122
  char const * const parameters[] = {jobLogIdText,detailNameId,field};
1123
  clearResult();
1124
  result = PQexecParams(
1125
    connection,
1126
    "insert into job_details(job_log_id,job_detail_name_id,value) "
1127
    "values($1,$2,$3) returning id",
1128
    N(parameters),0,(const char * const * const)parameters,0,0,0);
1129
  /* collisions can not occur in schema design; therefore, checkup is used */
1130
  if(checkup(result,tag,"insert") != 1) {
1131
    logging(LOG_ERR,"insertJobDetail failed, %s\n",tag);
1132
    Exit(45);
1133
  }
1134
}
1135

    
1136
static void insertJobNode(
1137
    char const * const jobLogIdText,
1138
    char const * const name,char const * const core) {
1139
  clearResult();
1140
  char tag[128];
1141
  snprintf(tag,N(tag),"in insertJobNode(%s,%s/%s)",jobLogIdText,name,core);
1142
  checkConnection(tag);
1143
  char nodeIdText[16];
1144
  snprintf(nodeIdText,N(nodeIdText),"%d",getNodeId(name));
1145
  char const * const parameters[] = {jobLogIdText,nodeIdText,core};
1146
  clearResult();
1147
  result = PQexecParams(
1148
    connection,
1149
    "insert into job_nodes(job_log_id,node_id,core) "
1150
    "values($1,$2,$3) returning id",
1151
    N(parameters),0,(const char * const * const)parameters,0,0,0);
1152
  /* collisions can not occur in schema design; therefore, checkup is used */
1153
  if(checkup(result,tag,"insert") != 1) {
1154
    logging(LOG_ERR,"insertJobNode failed, %s\n",tag);
1155
    Exit(46);
1156
  }
1157
}
1158

    
1159
static void insertJobNodes(
1160
    char const * const jobLogIdText,char const * const list) {
1161
  char const *location;
1162
  char const *theEnd;
1163
  char const *nextLocation;
1164
  for(location = list, theEnd = list + strlen(list);
1165
      location != theEnd;
1166
      location = nextLocation) {
1167
    char *slash = index(location,'/');
1168
    if(slash) {
1169
      char *name = makeString(location,slash);
1170
      char *terminator = index(slash+1,'+');
1171
      if(terminator) {
1172
        nextLocation = terminator+1;
1173
      } else {
1174
        terminator = slash+1+strlen(slash+1);
1175
        nextLocation = theEnd;
1176
      }
1177
      char *core = makeString(slash+1,terminator);
1178
      insertJobNode(jobLogIdText,name,core);
1179
      free(name);
1180
      free(core);
1181
    } else {
1182
      location = theEnd;
1183
    }
1184
  }
1185
}
1186

    
1187
static int processLogRecord(
1188
    char const * const record,long long const size,int *duplicate) {
1189
  char *p1 = index(record,';');
1190
  char *logTime = makeString(record,p1);
1191
  char *p2 = index(p1+1,';');
1192
  char *logStatus = makeString(p1+1,p2);
1193
  char *p3 = index(p2+1,';');
1194
  char *logJob = makeString(p2+1,p3);
1195
  char *logData = strdup(p3+1);
1196
  if(!logData) {
1197
    logging(LOG_ERR,"unable to duplicate a string!\n");
1198
    Exit(47);
1199
  }
1200
  char tag[128];
1201
  snprintf(tag,N(tag),"%s;%s;%s",logTime,logStatus,logJob);
1202
  char *p4 = index(logJob,'.');
1203
  /* job array job number format is ###[##]; therefore, string not integer */
1204
  char *logJobNumber = makeString(logJob,p4);
1205
  char *p5 = index(p4+1,'.');
1206
  char *logManager = makeString(p4+1,p5);
1207
  char *logCluster = strdup(logManager);
1208
  if(!logCluster) {
1209
    logging(LOG_ERR,"unable to avail logCluster storage\n");
1210
    Exit(48);
1211
  }
1212
  char *c;
1213
  for(c = logCluster; *c; ++c) { if(isdigit(*c)) { *c = '\0'; break; } }
1214
  int jobId = getJobId(logJobNumber,logManager);
1215
  char jobIdText[32];
1216
  snprintf(jobIdText,N(jobIdText),"%lld",(long long)jobId);
1217
  int statusId = getJobStatusId(logStatus);
1218
  int jobLogId = getJobLogId(jobIdText,statusId,logTime,size,duplicate);
1219
  char *user = 0;
1220
  char *group = 0;
1221
  char *account = 0;
1222
  int  wall_seconds = 0;
1223
  char *nodect = 0;
1224
  if(*duplicate) {
1225
    logging(LOG_DEBUG,"duplicate record! %s %lld\n",logJob,size);
1226
  } else {
1227
    char jobLogIdText[32];
1228
    snprintf(jobLogIdText,N(jobLogIdText),"%d",jobLogId);
1229
    char *location, *theEnd, *nextLocation;
1230
    for(location = logData, theEnd = logData + strlen(logData);
1231
        location != theEnd;
1232
        location = nextLocation) {
1233
      if(location[0] == ' ' || location[0] == '\n') {
1234
        nextLocation = location + 1;
1235
      } else {
1236
        char *equal = index(location,'=');
1237
        if(equal) {
1238
          char *name = makeString(location,equal);
1239
          char *space = index(equal+1,' ');
1240
          if(!space) space = index(equal+1,'\n');
1241
          if(!space) {
1242
            logging(LOG_DEBUG,"no field terminator: %s %s %lld",
1243
                    logCluster,logJobNumber,size);
1244
            nextLocation = theEnd;
1245
          }
1246
          char *field = makeString(equal+1,space);
1247
          nextLocation = space+1;
1248
          if(strcasecmp(name,"resources_used.walltime") == 0) {
1249
            char *c;
1250
            int seconds = 3600*((int)strtol(field,&c,0));
1251
            if(*c == ':') {
1252
              seconds += 60*((int)strtol(c+1,&c,0));
1253
              if(*c == ':') {
1254
                seconds += (int)strtol(c+1,&c,0);
1255
                if(*c == '\0') wall_seconds = seconds;
1256
              }
1257
            }
1258
          }
1259
          if(strcmp(name,"exec_host") != 0) {
1260
            insertJobDetail(jobLogIdText,name,field);
1261
            if(strcmp(name,"user") == 0) {
1262
              user = strdup(field);
1263
              if(!user) {
1264
                logging(LOG_ERR,"unable to avail user storage\n");
1265
                Exit(49);
1266
              }
1267
            } else if(strcmp(name,"group") == 0) {
1268
              group = strdup(field);
1269
              if(!group) {
1270
                logging(LOG_ERR,"unable to avail group storage\n");
1271
                Exit(50);
1272
              }
1273
            } else if(strcmp(name,"account") == 0) {
1274
              account = strdup(field);
1275
              if(!account) {
1276
                logging(LOG_ERR,"unable to avail account storage\n");
1277
                Exit(51);
1278
              }
1279
            } else if(strcasecmp(name,"Resource_List.nodect") == 0) {
1280
              nodect = strdup(field);
1281
              if(!nodect) {
1282
                logging(LOG_ERR,"unable to avail nodect storage\n");
1283
                Exit(52);
1284
              }
1285
            }
1286
          } else {
1287
            insertJobNodes(jobLogIdText,field);
1288
          }
1289
          free(name);
1290
          free(field);
1291
        } else {
1292
          location = theEnd;
1293
        }
1294
      }
1295
    }
1296
  }
1297
  { char command[1024];
1298
    char number[16];
1299
    char const *delimiter = "";
1300
    strcpy(command,"update jobs set");
1301
    int doUpdate = 0;
1302
    if(user) {
1303
      doUpdate = 1;
1304
      strncat(command,delimiter,N(command));
1305
      delimiter = ",";
1306
      strncat(command," user_name = ",N(command));
1307
      char const * const text = quoteText(user);
1308
      strncat(command,text,N(command));
1309
      free((void *)text);
1310
    }
1311
    if(group) {
1312
      doUpdate = 1;
1313
      strncat(command,delimiter,N(command));
1314
      delimiter = ",";
1315
      strncat(command," group_name = ",N(command));
1316
      char const * const text = quoteText(group);
1317
      strncat(command,text,N(command));
1318
      free((void *)text);
1319
    }
1320
    if(account) {
1321
      doUpdate = 1;
1322
      strncat(command,delimiter,N(command));
1323
      delimiter = ",";
1324
      strncat(command," account = ",N(command));
1325
      char const * const text = quoteText(account);
1326
      strncat(command,text,N(command));
1327
      free((void *)text);
1328
    }
1329
    if(nodect) {
1330
      doUpdate = 1;
1331
      strncat(command,delimiter,N(command));
1332
      delimiter = ",";
1333
      strncat(command," last_node_count = ",N(command));
1334
      char const * const text = quoteText(nodect);
1335
      strncat(command,text,N(command));
1336
      free((void *)text);
1337
    }
1338
    if(wall_seconds > 0) {
1339
      doUpdate = 1;
1340
      strncat(command,delimiter,N(command));
1341
      delimiter = ",";
1342
      strncat(command," wall_seconds = wall_seconds + ",N(command));
1343
      snprintf(number,N(number),"%d",wall_seconds);
1344
      strncat(command,number,N(command));
1345
    }
1346
    if(strcmp(logStatus,"S") == 0) {
1347
      doUpdate = 1;
1348
      strncat(command,delimiter,N(command));
1349
      delimiter = ",";
1350
      strncat(command," start_count = start_count + 1",N(command));
1351
      strncat(command,delimiter,N(command));
1352
      strncat(command," last_start = to_timestamp('",N(command));
1353
      strncat(command,logTime,N(command));
1354
      strncat(command,"','MM/DD/YYYY HH24:MI:SS')",N(command));
1355
    }
1356
    if(strcmp(logStatus,"E") == 0) {
1357
      doUpdate = 1;
1358
      strncat(command,delimiter,N(command));
1359
      delimiter = ",";
1360
      strncat(command," last_end = to_timestamp('",N(command));
1361
      strncat(command,logTime,N(command));
1362
      strncat(command,"','MM/DD/YYYY HH24:MI:SS')",N(command));
1363
    }
1364
    if(doUpdate) {
1365
      strncat(command," where id = ",N(command));
1366
      strncat(command,jobIdText,N(command));
1367
      /* logging(LOG_DEBUG,"%s;\n",command); */
1368
      checkConnection(tag);
1369
      clearResult();
1370
      result = PQexecParams(connection,command,0,0,0,0,0,0);
1371
      command_checkup(result,tag,"update job record");
1372
      clearResult();
1373
    }
1374
  }
1375
  free(logTime);
1376
  free(logStatus);
1377
  free(logJob);
1378
  free(logData);
1379
  free(logJobNumber);
1380
  free(logManager);
1381
  free(logCluster);
1382
  free(user);
1383
  free(group);
1384
  free(account);
1385
  free(nodect);
1386
  return clusterId;
1387
}
1388

    
1389
static void processLogFile(logPointer const log) {
1390
  char tag[128];
1391
  snprintf(tag,N(tag),"processLogFile(%s)",log->logName);
1392
  clearResult();
1393
  int active;
1394
  logOpen(log,&active);
1395
  if(active) {
1396
    logClose(log);
1397
  } else {
1398
    if(log->input) {
1399
      if(log->knownEnd) {
1400
        if(fseeko(log->input,log->knownEnd,SEEK_SET)) {
1401
          logging(LOG_DEBUG,"\"%s\" unable to seek to %lld\n",
1402
                  log->logName,(long long)log->knownEnd);
1403
          logClose(log);
1404
          return;
1405
        }
1406
      }
1407
      while(fgets(line,N(line),log->input)) {
1408
        long long previousEnd = log->knownEnd;
1409
        long long position = ftello(log->input);
1410
        if(position < 0) {
1411
          logging(LOG_DEBUG,"\"%s\" unable to determine current position",
1412
                  log->logName);
1413
          logClose(log);
1414
          return;
1415
        }
1416
        log->knownEnd = position;
1417
        int length = strnlen(line,N(line));
1418
        int semicolonCount = 0;
1419
        int i;
1420
        for(i=0; i<length && semicolonCount < 3; ++i) {
1421
          if(line[i] == ';') ++semicolonCount;
1422
        }
1423
        if(semicolonCount < 3) {
1424
          logging(LOG_DEBUG,
1425
                  "%s:%lld record does not have at least 3 semicolons (%d)\n",
1426
                  log->logName,previousEnd,length);
1427
          continue;
1428
        }
1429
        char *semicolon = index(line,';');
1430
        semicolon = index(semicolon+1,';');
1431
        semicolon = index(semicolon+1,';');
1432
        int recordIdLength = (semicolon-line)-1;
1433
        char *recordId = (char *)malloc(recordIdLength+1);
1434
        memcpy(recordId,line,recordIdLength);
1435
        recordId[recordIdLength] = '\0';
1436
        if(length >= N(line)) {
1437
          logging(LOG_DEBUG,"%s:%lld %s record is too long (limit %lld)\n",
1438
                  log->logName,previousEnd,recordId,(long long)(2*M));
1439
          continue;
1440
        }
1441
        int duplicate = 0;
1442
        perform("begin");
1443
        processLogRecord(line,previousEnd,&duplicate);
1444
        recordLogPosition(log->logName,position);
1445
        perform(duplicate ? "rollback" : "commit");
1446
      }
1447
      logClose(log);
1448
    } else {
1449
      logging(LOG_DEBUG,"unopenable file \"%s\" deleted\n",log->logName);
1450
      deleteLogFile(log);
1451
    }
1452
  }
1453
}
1454

    
1455
static void close_old_log_files(void) {
1456
  time_t now_time = time(0);
1457
  struct tm *now = localtime(&now_time);
1458
  char today[16];
1459
  snprintf(today,N(today),"%04d%02d%02d",
1460
    1900+now->tm_year, 1+now->tm_mon, now->tm_mday);
1461
  logPointer log;
1462
  for(log = logList; log; log = log->next) {
1463
    if(log->input && (strcmp(log->logName,today) < 0)) logClose(log);
1464
  }
1465
}
1466

    
1467
static void cleanup(void) {
1468
  if(result) PQclear(result);
1469
  if(connection) PQfinish(connection);
1470
  if(logPatternFree) regfree(&logPattern);
1471
  logPointer log, next;
1472
  for(log = logList; log; log = next) {
1473
    next = log -> next;
1474
    deleteLogFile(log);
1475
  }
1476
}
1477

    
1478
/* Assumption: all log records in the log directory have
1479
   the same cluster name implied in the job identifier.
1480
   If this assumption is violated, then the program will fail.
1481
*/
1482

    
1483
static void extractClusterIdentification(void) {
1484
  logPointer log;
1485
  for(log = logList; log; log = log->next) {
1486
    char filename[8192];
1487
    strncpy(filename,log_dir,N(filename));
1488
    strncat(filename,"/",N(filename));
1489
    strncat(filename,log->logName,N(filename));
1490
    FILE *file = fopen(filename,"r");
1491
    if(file) {
1492
      while(fgets(line,N(line),file)) {
1493
        char *c;
1494
        c = index(line,';');
1495
        if(c) {
1496
          c = index(c+1,';');
1497
          if(c) {
1498
            c = index(c+1,'.');
1499
            if(c) {
1500
              ++c;
1501
              char *e;
1502
              for(e = c; *e && isalpha(*e); ++e) {}
1503
              if(e != c) {
1504
                c = makeString(c,e);
1505
                getClusterId(c);
1506
                free(c);
1507
                fclose(file);
1508
                return;
1509
              }
1510
            }
1511
          }
1512
        }
1513
      }
1514
      fclose(file);
1515
    }
1516
  }
1517
  logging(LOG_ERR,"unable to open and read any log file\n");
1518
  Exit(53);
1519
}
1520

    
1521
struct sigaction old = {0}, new = {0};
1522

    
1523
void theTerminator(int signal, siginfo_t *info, void *restorer) {
1524
  time_t now = time(0);
1525
  logging(LOG_ERR,"execution terminated by interrupt %s\n",ctime(&now));
1526
  Exit(54);
1527
}
1528

    
1529
int main(int argc, char **argv) {
1530
  snprintf(programPIDText,N(programPIDText),"%ld",getpid());
1531

    
1532
  if(atexit(cleanup)) {
1533
    logging(LOG_ERR,"unable to establish at exit cleanup\n");
1534
    Exit(55);
1535
  }
1536

    
1537
  new.sa_sigaction = &theTerminator;
1538
  new.sa_flags = SA_SIGINFO;
1539

    
1540
  if(sigaction(SIGINT,&new,&old)) {
1541
    logging(LOG_ERR,"failed to set interrupt signal handler, %s\n",
1542
            strerror(errno));
1543
    Exit(56);
1544
  }
1545
  
1546
  char* default_log_dir = "/var/spool/PBS/server_priv/accounting";
1547
  char errorBuffer[1024] = {0};
1548
  char *environ_log_dir = getenv("LOGDIR");
1549
  log_dir = environ_log_dir ? environ_log_dir : default_log_dir;
1550

    
1551
  char* default_prefix = "";
1552
  char prefixBuffer[1024] = {0};
1553
  char *environ_prefix = getenv("PREFIX");
1554
  char *prefix = environ_prefix ? environ_prefix : default_prefix;
1555

    
1556
  char *outputParm = getenv("OUTPUT");
1557

    
1558
  char *syslogParm = getenv("SYSLOG");
1559

    
1560
  char option = '\0';
1561
  int oops = 0;
1562
  while((option = getopt(argc,argv,"+h?l:p:s:o:")) != -1) {
1563
    switch(option) {
1564
    case 'l':
1565
      log_dir = optarg;
1566
      break;
1567
    case 'p':
1568
      prefix = optarg;
1569
      break;
1570
    case 's':
1571
      syslogParm = optarg;
1572
      break;
1573
    case 'o':
1574
      outputParm = optarg;
1575
      break;
1576
    case 'h':
1577
    case '?':
1578
    default:
1579
      oops = 1;
1580
      break;
1581
    }
1582
  }
1583
  
1584
  if(oops) {
1585
    char *name = argv[0];
1586
    char *c = rindex(name,'/');
1587
    if(c) name = c+1;
1588
    fprintf(stderr,
1589
            "Usage: %s [-l logDir] [-p logPrefix ] -s [lowSyslog] "
1590
            "-o [stderr|stdout]\n",
1591
            name);
1592
    fprintf(stderr,"\n");
1593
    fprintf(stderr,"default log directory:            %s\n",default_log_dir);
1594
    fprintf(stderr,"  may be set by LOGDIR shell environment value\n");
1595
    fprintf(stderr,"\n");
1596
    fprintf(stderr,"default log prefix:               (null prefix)\n");
1597
    fprintf(stderr,"  may be set by PREFIX shell environment value\n");
1598
    fprintf(stderr,"\n");
1599
    fprintf(stderr,"default low syslog message class: (no syslog output)\n");
1600
    fprintf(stderr,"  may be set by SYSLOG shell environment value\n");
1601
    fprintf(stderr,"  classes in descending order:\n");
1602
    fprintf(stderr,"\n");
1603
    fprintf(stderr,
1604
            "    EMERG, ALERT, CRIT, ERR, WARNING, NOTICE, INFO and DEBUG\n");
1605
    fprintf(stderr,"\n");
1606
    fprintf(stderr,"default output file:              (no std file output)\n");
1607
    fprintf(stderr,"  may be set by OUTPUT shell environment value\n");
1608
    fprintf(stderr,"  stdout or stderr may be requested\n");
1609
    fprintf(stderr,"-s and -o arguments are case independent.\n");
1610
    fprintf(stderr,"Program is intended to be runable by /etc/init.\n");
1611
    Exit(57);
1612
  }
1613

    
1614
  if(outputParm) {
1615
    if(strcasecmp(outputParm,"sysout") == 0) {
1616
      output = stdout;
1617
    } else if(strcasecmp(outputParm,"syserr") == 0) {
1618
      output = stderr;
1619
    } else {
1620
      output = stdout;
1621
    }
1622
  }
1623
  if(syslogParm) {
1624
    openlog("pbsscan", LOG_PID, LOG_USER);
1625
    logMask = major;
1626
    setlogmask(major);
1627
    int i;
1628
    for(i = 0; i < N(priorities); ++i) {
1629
      if(strcasecmp(syslogParm,priorities[i].name) == 0) break;
1630
    }
1631
    if(i < N(priorities)) {
1632
      int j;
1633
      for(j = 0; j <= i; ++j) logMask |= priorities[j].value;
1634
      setlogmask(logMask);
1635
    }
1636
  }
1637

    
1638
  DIR *logDir = opendir(log_dir);
1639
  if(!logDir) {
1640
    logging(LOG_ERR,"unable to open log directory, %s.\n  reason: %s\n",
1641
            log_dir,strerror(errno));
1642
    Exit(58);
1643
  }
1644
  char logRegExp[320] = "";
1645
  snprintf(logRegExp,N(logRegExp),"^%s2[0-9]{7}$",prefix);
1646
  int errCode = 0;
1647
  if((errCode = regcomp(&logPattern,logRegExp,REG_NOSUB|REG_EXTENDED))) {
1648
    regerror(errCode,&logPattern,errorBuffer,(size_t)(N(errorBuffer)));
1649
    logging(LOG_ERR,"unable to compile log pattern, %s.\n  reason: %s\n",
1650
            logRegExp,errorBuffer);
1651
    Exit(59);
1652
  } else {
1653
    logPatternFree = 1;
1654
  }
1655

    
1656
  connection = PQconnectdb(connectionData);
1657
  if(!connection) {
1658
    logging(LOG_ERR,"connection not allocated.\n");
1659
    Exit(60);
1660
  }
1661
  checkConnection("initial check");
1662

    
1663
  /* start the log directory watch before the initial load to catch all
1664
     changes to the log files.
1665
  */
1666

    
1667
  inotifyFd = inotify_init();
1668
  if(inotifyFd < 0) {
1669
    logging(LOG_DEBUG,"unable to open inotify fd\n  reason: %s\n",
1670
            strerror(errno));
1671
    return 1;
1672
  }
1673

    
1674
  int directoryWatch = inotify_add_watch(inotifyFd,log_dir,
1675
    IN_UNMOUNT|IN_DELETE_SELF|IN_MOVE_SELF|                   /* directory */
1676
    IN_CREATE|IN_MOVED_TO|IN_DELETE|IN_MOVED_FROM|IN_MODIFY); /* file */
1677
  if(directoryWatch < 0) {
1678
    logging(LOG_DEBUG,"unable to add logdir watch\n  reason: %s\n",
1679
            strerror(errno));
1680
    return 1;
1681
  }
1682

    
1683
  /* check existing log files in log directory */
1684

    
1685
  time_t now = time(0);
1686
  logging(LOG_DEBUG,"log directory scan start %s",ctime(&now));
1687
  struct dirent *directory = 0;
1688
  while(directory = readdir(logDir)) {
1689
    if(!regexec(&logPattern,directory->d_name,0,0,0)) {
1690
      (void)insertLogFile(directory->d_name);
1691
    }
1692
  }
1693
  closedir(logDir);
1694

    
1695
  extractClusterIdentification();
1696

    
1697
  now = time(0);
1698
  logging(LOG_DEBUG,"log directory scan end %s",ctime(&now));
1699
  logging(LOG_DEBUG,"static scan start %s",ctime(&now));
1700
  logPointer log, next;
1701
  for(log = logList; log; log = next) {
1702
    next = log -> next;
1703
    processLogFile(log);
1704
    close_old_log_files();
1705
  }
1706
  now = time(0);
1707
  logging(LOG_DEBUG,"static scan end %s",ctime(&now));
1708

    
1709
  /* inotify will not read an event unless the buffer is big enough to
1710
     hold the event.  On the other hand, if the buffer is big enough and
1711
     and multiple events will fit into it, then the multiple events are read.
1712
  */
1713

    
1714
  char watchBuffer[8192];
1715
  struct inotify_event *watch;
1716

    
1717
  logging(LOG_DEBUG,"dynamic scan start %s",ctime(&now));
1718
  while(1) {
1719
    int watchSize = read(inotifyFd,(void *)watchBuffer,(size_t)N(watchBuffer));
1720
    switch(watchSize) {
1721
    char *watchPos;
1722
    case -1:
1723
      if(errno != EAGAIN) {
1724
        logging(LOG_ERR,"problems reading inotify events\n  reason: %s\n",
1725
          strerror(errno));
1726
        Exit(61);
1727
      }
1728
      break;
1729
    case 0:
1730
      logging(LOG_ERR,"char[%d] watchBuffer is not large enough!\n",
1731
              N(watchBuffer));
1732
      Exit(62);
1733
    default:
1734
      watchPos = watchBuffer;
1735
      while((watchPos - watchBuffer) < watchSize) {
1736
        watch = (struct inotify_event *)watchPos;
1737
        if(has(watch->mask,IN_ISDIR)) {
1738
          if(has(watch->mask,IN_UNMOUNT)) {
1739
            logging(LOG_ERR,"the log directory file system was unmounted!\n");
1740
            Exit(63);
1741
          }
1742
          if(has(watch->mask,IN_DELETE_SELF)) {
1743
            logging(LOG_ERR,"the log directory itself was deleted!\n");
1744
            Exit(64);
1745
          }
1746
          if(has(watch->mask,IN_MOVE_SELF)) {
1747
            logging(LOG_ERR,"the log directory itself was moved!\n");
1748
            Exit(65);
1749
          }
1750
          continue;
1751
        }
1752
        if(       has(watch->mask,IN_CREATE) ||
1753
                  has(watch->mask,IN_MOVED_TO) ||
1754
                  has(watch->mask,IN_MODIFY)) {
1755
          if(watch->len != 0) {
1756
            if(!regexec(&logPattern,watch->name,0,0,0)) {
1757
              logPointer log = insertLogFile(watch->name);
1758
              if(log) {
1759
                processLogFile(log);
1760
                close_old_log_files();
1761
              } else {
1762
                logging(LOG_ERR,
1763
                        "on watch trigger, log insert failed for %s\n",
1764
                        watch->name);
1765
                Exit(66);
1766
              }
1767
            }
1768
          }
1769
        } else if(has(watch->mask,IN_DELETE) ||
1770
                  has(watch->mask,IN_MOVED_FROM)) {
1771
          if(watch->len != 0) deleteLogFileByName(watch->name);
1772
        }
1773
        watchPos += sizeof(*watch);
1774
        watchPos += watch->len;
1775
      }
1776
      break;
1777
    }
1778
  }
1779
  now = time(0);
1780
  logging(LOG_DEBUG,"dynamic scan end %s",ctime(&now));
1781
  return 0;
1782
}