* Fix long-undiscovered bug whereby certain LARGE or HUGE tables could have
[interchange.git] / lib / Vend / Table / Common.pm
1 # Vend::Table::Common - Common access methods for Interchange databases
2 #
3 # Copyright (C) 2002-2016 Interchange Development Group
4 # Copyright (C) 1996-2002 Red Hat, Inc.
5 #
6 # This program was originally based on Vend 0.2 and 0.3
7 # Copyright 1995 by Andrew M. Wilcox <amw@wilcoxsolutions.com>
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 2 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public
20 # License along with this program; if not, write to the Free
21 # Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
22 # MA  02110-1301  USA.
23
24 $VERSION = '2.52';
25 use strict;
26
27 package Vend::Table::Common;
28 require Vend::DbSearch;
29 require Vend::TextSearch;
30 require Vend::CounterFile;
31 no warnings qw(uninitialized numeric);
32 use Symbol;
33 use Vend::Util;
34
35 our $Has_Encode = 0;
36
37 if ($ENV{MINIVEND_DISABLE_UTF8}) {
38         # stub routines to pass-thru data if disabled
39         *encode_utf8 = sub {@_};
40         *decode_utf8 = sub {@_};
41 }
42 else {
43         require Encode;
44         import Encode qw( encode_utf8 decode_utf8 );
45         $Has_Encode = 1;
46 }
47
48 use Exporter;
49 use vars qw($Storable $VERSION @EXPORT @EXPORT_OK);
50 @EXPORT = qw(create_columns import_ascii_delimited import_csv config columns);
51 @EXPORT_OK = qw(import_quoted read_quoted_fields);
52
53 use vars qw($FILENAME
54                         $COLUMN_NAMES
55                         $COLUMN_INDEX
56                         $KEY_INDEX
57                         $TIE_HASH
58                         $DBM
59                         $EACH
60                         $CONFIG);
61 (
62         $CONFIG,
63         $FILENAME,
64         $COLUMN_NAMES,
65         $COLUMN_INDEX,
66         $KEY_INDEX,
67         $TIE_HASH,
68         $DBM,
69         $EACH,
70         ) = (0 .. 7);
71
72 # See if we can do Storable
73 BEGIN {
74         eval {
75                 die unless $ENV{MINIVEND_STORABLE_DB};
76                 require Storable;
77                 $Storable = 1;
78         };
79 }
80
81 my @Hex_string;
82 {
83     my $i;
84     foreach $i (0..255) {
85         $Hex_string[$i] = sprintf("%%%02X", $i);
86     }
87 }
88
89 sub create_columns {
90         my ($columns, $config) = @_;
91         $config = {} unless $config;
92     my $column_index = {};
93         my $key;
94 #::logDebug("create_columns: " . ::uneval($config));
95
96         if($config->{KEY}) {
97                 $key = $config->{KEY};
98         }
99         elsif (! defined $config->{KEY_INDEX}) {
100                 $config->{KEY_INDEX} = 0;
101                 $config->{KEY} = $columns->[0];
102         }
103     my $i;
104         my $alias = $config->{FIELD_ALIAS} || {};
105 #::logDebug("field_alias: " . ::uneval($alias)) if $config->{FIELD_ALIAS};
106     for ($i = 0;  $i < @$columns;  ++$i) {
107         $column_index->{$columns->[$i]} = $i;
108                 defined $alias->{$columns->[$i]}
109                         and $column_index->{ $alias->{ $columns->[$i] } } = $i;
110                 next unless defined $key and $key eq $columns->[$i];
111                 $config->{KEY_INDEX} = $i;
112                 undef $key;
113 #::logDebug("set KEY_INDEX to $i: " . ::uneval($config));
114     }
115
116     die errmsg(
117                         "Cannot find key column %s in %s (%s): %s",
118                         $config->{KEY},
119                         $config->{name},
120                         $config->{file},
121                         $!,
122             ) unless defined $config->{KEY_INDEX};
123
124         return $column_index;
125 }
126
127 sub separate_definitions {
128         my ($options, $fields) = @_;
129         for(@$fields) {
130 #::logDebug("separating '$_'");
131                 next unless s/\s+(.*)//;
132 #::logDebug("needed separation: '$_'");
133                 my $def = $1;
134                 my $fn = $_;
135                 unless(defined $options->{COLUMN_DEF}{$fn}) {
136                         $options->{COLUMN_DEF}{$fn} = $def;
137                 }
138         }
139         return;
140 }
141
142 sub clear_lock {
143         my $s = shift;
144         return unless $s->[$CONFIG]{IC_LOCKING};
145         if($s->[$CONFIG]{_lock_handle}) {
146                 close $s->[$CONFIG]{_lock_handle};
147                 delete $s->[$CONFIG]{_lock_handle};
148         }
149 }
150
151 sub lock_table {
152         my $s = shift;
153         return unless $s->[$CONFIG]{IC_LOCKING};
154         my $lockhandle;
155         if(not $lockhandle = $s->[$CONFIG]{_lock_handle}) {
156                 my $lf = $s->[$CONFIG]{file} . '.lock';
157                 unless($lf =~ m{/}) {
158                         $lf = ($s->[$CONFIG]{dir} || $Vend::Cfg->{ProductDir}) . "/$lf";
159                 }
160                 $lockhandle = gensym;
161                 $s->[$CONFIG]{_lock_file} = $lf;
162                 $s->[$CONFIG]{_lock_handle} = $lockhandle;
163                 open $lockhandle, ">> $lf"
164                         or die errmsg("Cannot lock table %s (%s): %s", $s->[$CONFIG]{name}, $lf, $!);
165         }
166 #::logDebug("lock handle=$lockhandle");
167         Vend::Util::lockfile($lockhandle);
168 }
169
170 sub unlock_table {
171         my $s = shift;
172         return unless $s->[$CONFIG]{IC_LOCKING};
173         Vend::Util::unlockfile($s->[$CONFIG]{_lock_handle});
174 }
175
176 sub stuff {
177     my ($val) = @_;
178     $val = encode_utf8($val)
179         if $Has_Encode && ($::Variable->{MV_UTF8} || $Global::Variable->{MV_UTF8});
180     $val =~ s,([\t\%]),$Hex_string[ord($1)],eg;
181     return $val;
182 }
183
184 sub unstuff {
185     my ($val) = @_;
186     $val =~ s,%(..),chr(hex($1)),eg;
187     $val = decode_utf8($val)
188         if $Has_Encode && ($::Variable->{MV_UTF8} || $Global::Variable->{MV_UTF8});
189     return $val;
190 }
191
192 sub autonumber {
193         my $s = shift;
194         my $start;
195         my $cfg = $s->[$CONFIG];
196
197         return $s->autosequence() if $cfg->{AUTO_SEQUENCE};
198
199         return '' if not $start = $cfg->{AUTO_NUMBER};
200         local($/) = "\n";
201         my $c = $s->[$CONFIG];
202         if(! defined $c->{AutoNumberCounter}) {
203                 $c->{AutoNumberCounter} = new Vend::CounterFile
204                                                                         $cfg->{AUTO_NUMBER_FILE},
205                                                                         $start,
206                                                                         $c->{AUTO_NUMBER_DATE},
207                                                                         ;
208         }
209         my $num;
210         do {
211                 $num = $c->{AutoNumberCounter}->inc();
212         } while $s->record_exists($num);
213         return $num;
214 }
215
216 # These don't work in non-DBI databases
217 sub commit   { 1 }
218 sub rollback { 0 }
219
220 sub numeric {
221         return exists $_[0]->[$CONFIG]->{NUMERIC}->{$_[1]};
222 }
223
224 sub quote {
225         my($s, $value, $field) = @_;
226         return $value if $s->numeric($field);
227         $value =~ s/'/\\'/g;
228         return "'$value'";
229 }
230
231 sub config {
232         my ($s, $key, $value) = @_;
233         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
234         return $s->[$CONFIG]{$key} unless defined $value;
235         $s->[$CONFIG]{$key} = $value;
236 }
237
238 sub import_db {
239         my($s) = @_;
240         my $db = Vend::Data::import_database($s->[0], 1);
241         return undef if ! $db;
242         $Vend::Database{$s->[0]{name}} = $db;
243         Vend::Data::update_productbase($s->[0]{name});
244         if($db->[$CONFIG]{export_now}) {
245                 Vend::Data::export_database($db);
246                 delete $db->[$CONFIG]{export_now};
247         }
248         return $db;
249 }
250
251 sub close_table {
252     my ($s) = @_;
253         return 1 if ! defined $s->[$TIE_HASH];
254 #::logDebug("closing table $s->[$FILENAME]");
255         undef $s->[$DBM];
256         $s->clear_lock();
257     untie %{$s->[$TIE_HASH]}
258                 or $s->log_error("%s %s: %s", errmsg("untie"), $s->[$FILENAME], $!);
259         undef $s->[$TIE_HASH];
260 #::logDebug("closed table $s->[$FILENAME], self=" . ::uneval($s));
261 }
262
263 sub filter {
264         my ($s, $ary, $col, $filter) = @_;
265         my $column;
266         for(keys %$filter) {
267                 next unless defined ($column = $col->{$_});
268                 $ary->[$column] = Vend::Interpolate::filter_value(
269                                                                 $filter->{$_},
270                                                                 $ary->[$column],
271                                                                 $_,
272                                                   );
273         }
274 }
275
276 sub columns {
277     my ($s) = @_;
278         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
279     return @{$s->[$COLUMN_NAMES]};
280 }
281
282 sub column_exists {
283         return defined test_column(@_);
284 }
285
286 sub test_column {
287     my ($s, $column) = @_;
288         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
289     return $s->[$COLUMN_INDEX]{$column};
290 }
291
292 sub column_index {
293     my ($s, $column) = @_;
294         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
295     my $i = $s->[$COLUMN_INDEX]{$column};
296     die $s->log_error(
297                                 "There is no column named '%s' in %s",
298                                 $column,
299                                 $s->[$FILENAME],
300                         ) unless defined $i;
301     return $i;
302 }
303
304 *test_record = \&record_exists;
305
306 sub record_exists {
307     my ($s, $key) = @_;
308         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
309     my $r = $s->[$DBM]->EXISTS("k$key");
310     return $r;
311 }
312
313 sub name {
314         my ($s) = shift;
315         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
316         return $s->[$CONFIG]{name};
317 }
318
319 sub row_hash {
320     my ($s, $key) = @_;
321         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
322         return undef unless $s->record_exists($key);
323         my %row;
324     @row{ @{$s->[$COLUMN_NAMES]} } = $s->row($key);
325         return \%row;
326 }
327
328 sub unstuff_row {
329     my ($s, $key) = @_;
330         $s->lock_table() if $s->[$CONFIG]{IC_LOCKING};
331     my $line = $s->[$TIE_HASH]{"k$key"};
332         $s->unlock_table() if $s->[$CONFIG]{IC_LOCKING};
333     die $s->log_error(
334                                         "There is no row with index '%s' in database %s",
335                                         $key,
336                                         $s->[$FILENAME],
337                         ) unless defined $line;
338     return map(unstuff($_), split(/\t/, $line, 9999))
339                 unless $s->[$CONFIG]{FILTER_FROM};
340         my @f = map(unstuff($_), split(/\t/, $line, 9999));
341         $s->filter(\@f, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_FROM});
342         return @f;
343 }
344
345 sub thaw_row {
346     my ($s, $key) = @_;
347         $s->lock_table() if $s->[$CONFIG]{IC_LOCKING};
348     my $line = $s->[$TIE_HASH]{"k$key"};
349         $s->unlock_table() if $s->[$CONFIG]{IC_LOCKING};
350     die $s->log_error( "There is no row with index '%s'", $key,)
351                 unless defined $line;
352     return (@{ Storable::thaw($line) })
353                 unless $s->[$CONFIG]{FILTER_FROM};
354 #::logDebug("filtering.");
355         my $f = Storable::thaw($line);
356         $s->filter($f, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_FROM});
357         return @{$f};
358 }
359
360 sub field_accessor {
361     my ($s, $column) = @_;
362         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
363     my $index = $s->column_index($column);
364     return sub {
365         my ($key) = @_;
366         return ($s->row($key))[$index];
367     };
368 }
369
370 sub row_settor {
371     my ($s, @cols) = @_;
372         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
373         my @index;
374         my $key_idx = $s->[$KEY_INDEX] || 0;
375         #shift(@cols);
376         for(@cols) {
377         push @index, $s->column_index($_);
378         }
379 #::logDebug("settor index=@index");
380     return sub {
381         my (@vals) = @_;
382                 my @row;
383                 my $key = $vals[$key_idx];
384                 eval {
385                         @row = $s->row($key);
386                 };
387         @row[@index] = @vals;
388 #::logDebug("setting $key indices '@index' to '@vals'");
389         $s->set_row(@row);
390     };
391 }
392
393 sub get_slice {
394     my ($s, $key, $fary) = @_;
395         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
396
397         return undef unless $s->record_exists($key);
398
399         if(ref $fary ne 'ARRAY') {
400                 shift; shift;
401                 $fary = [ @_ ];
402         }
403
404         my @result = ($s->row($key))[ map { $s->column_index($_) } @$fary ];
405         return wantarray ? @result : \@result;
406 }
407
408 sub set_slice {
409         my ($s, $key, $fary, $vary) = @_;
410         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
411
412     if($s->[$CONFIG]{Read_only}) {
413                 $s->log_error(
414                         "Attempt to set slice of %s in read-only table %s",
415                         $key,
416                         $s->[$CONFIG]{name},
417                 );
418                 return undef;
419         }
420
421         my $opt;
422         if (ref ($key) eq 'ARRAY') {
423                 $opt = shift @$key;
424                 $key = shift @$key;
425         }
426         $opt = {}
427                 unless ref ($opt) eq 'HASH';
428
429         $opt->{dml} = 'upsert'
430                 unless defined $opt->{dml};
431
432         if(ref $fary ne 'ARRAY') {
433                 my $href = $fary;
434                 if(ref $href ne 'HASH') {
435                         $href = { splice (@_, 2) };
436                 }
437                 $vary = [ values %$href ];
438                 $fary = [ keys   %$href ];
439         }
440
441         my $keyname = $s->[$CONFIG]{KEY};
442
443         my ($found_key) = grep $_ eq $keyname, @$fary;
444
445         if(! $found_key) {
446                 unshift @$fary, $keyname;
447                 unshift @$vary, $key;
448         }
449
450         my @current;
451
452         if ($s->record_exists($key)) {
453                 if ($opt->{dml} eq 'insert') {
454                         $s->log_error(
455                                 "Duplicate key on set_slice insert for key '$key' on table %s",
456                                 $s->[$CONFIG]{name},
457                         );
458                         return undef;
459                 }
460                 @current = $s->row($key);
461         }
462         elsif ($opt->{dml} eq 'update') {
463                 $s->log_error(
464                         "No record to update set_slice for key '$key' on table %s",
465                         $s->[$CONFIG]{name},
466                 );
467                 return undef;
468         }
469
470         @current[ map { $s->column_index($_) } @$fary ] = @$vary;
471
472         $key = $s->set_row(@current);
473         length($key) or
474                 $s->log_error(
475                         "Did set_slice with empty key on table %s",
476                         $s->[$CONFIG]{name},
477                 );
478
479         return $key;
480 }
481
482 sub field_settor {
483     my ($s, $column) = @_;
484         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
485     my $index = $s->column_index($column);
486     return sub {
487         my ($key, $value) = @_;
488         my @row = $s->row($key);
489         $row[$index] = $value;
490         $s->set_row(@row);
491     };
492 }
493
494 sub clone_row {
495         my ($s, $old, $new) = @_;
496         return undef unless $s->record_exists($old);
497         my @ary = $s->row($old);
498         $ary[$s->[$KEY_INDEX]] = $new;
499         $s->set_row(@ary);
500         return $new;
501 }
502
503 sub clone_set {
504         my ($s, $col, $old, $new) = @_;
505         return unless $s->column_exists($col);
506         my $sel = $s->quote($old, $col);
507         my $name = $s->[$CONFIG]{name};
508         my ($ary, $nh, $na) = $s->query("select * from $name where $col = $sel");
509         my $fpos = $nh->{$col} || return undef;
510         $s->config('AUTO_NUMBER', '000001') unless $s->config('AUTO_NUMBER');
511         for(@$ary) {
512                 my $line = $_;
513                 $line->[$s->[$KEY_INDEX]] = '';
514                 $line->[$fpos] = $new;
515                 $s->set_row(@$line);
516         }
517         return $new;
518 }
519
520 sub stuff_row {
521     my ($s, @fields) = @_;
522         my $key = $fields[$s->[$KEY_INDEX]];
523
524 #::logDebug("stuff key=$key");
525         $fields[$s->[$KEY_INDEX]] = $key = $s->autonumber()
526                 if ! length($key);
527         $s->filter(\@fields, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_TO})
528                 if $s->[$CONFIG]{FILTER_TO};
529         $s->lock_table();
530
531     $s->[$TIE_HASH]{"k$key"} = join("\t", map(stuff($_), @fields));
532
533         $s->unlock_table();
534         return $key;
535 }
536
537 sub freeze_row {
538     my ($s, @fields) = @_;
539         my $key = $fields[$s->[$KEY_INDEX]];
540 #::logDebug("freeze key=$key");
541         $fields[$s->[$KEY_INDEX]] = $key = $s->autonumber()
542                 if ! length($key);
543         $s->filter(\@fields, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_TO})
544                 if $s->[$CONFIG]{FILTER_TO};
545         $s->lock_table();
546         $s->[$TIE_HASH]{"k$key"} = Storable::freeze(\@fields);
547         $s->unlock_table();
548         return $key;
549 }
550
551 if($Storable) {
552         *set_row = \&freeze_row;
553         *row = \&thaw_row;
554 }
555 else {
556         *set_row = \&stuff_row;
557         *row = \&unstuff_row;
558 }
559
560 sub foreign {
561     my ($s, $key, $foreign) = @_;
562         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
563         $key = $s->quote($key, $foreign);
564     my $q = "select $s->[$CONFIG]{KEY} from $s->[$CONFIG]{name} where $foreign = $key";
565 #::logDebug("foreign key query = $q");
566     my $ary = $s->query({ sql => $q });
567 #::logDebug("foreign key query returned" . ::uneval($ary));
568         return undef unless $ary and $ary->[0];
569         return $ary->[0][0];
570 }
571
572 sub field {
573     my ($s, $key, $column) = @_;
574         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
575     return ($s->row($key))[$s->column_index($column)];
576 }
577
578 sub set_field {
579     my ($s, $key, $column, $value) = @_;
580         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
581     if($s->[$CONFIG]{Read_only}) {
582                 $s->log_error(
583                         "Attempt to write %s in read-only table",
584                         "$s->[$CONFIG]{name}::${column}::$key",
585                 );
586                 return undef;
587         }
588     my @row;
589         if($s->record_exists($key)) {
590                 @row = $s->row($key);
591         }
592         else {
593                 $row[$s->[$KEY_INDEX]] = $key;
594         }
595     $row[$s->column_index($column)] = $value
596                 if $column;
597     $s->set_row(@row);
598         $value;
599 }
600
601 sub inc_field {
602     my ($s, $key, $column, $adder) = @_;
603         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
604     my($value);
605     if($s->[$CONFIG]{Read_only}) {
606                 $s->log_error(
607                         "Attempt to write %s in read-only table",
608                         "$s->[$CONFIG]{name}::${column}::$key",
609                 );
610                 return undef;
611         }
612     my @row = $s->row($key);
613         my $idx = $s->column_index($column);
614 #::logDebug("ready to increment key=$key column=$column adder=$adder idx=$idx row=" . ::uneval(\@row));
615     $value = $row[$s->column_index($column)] += $adder;
616     $s->set_row(@row);
617     return $value;
618 }
619
620 sub create_sql {
621     return undef;
622 }
623
624 sub touch {
625     my ($s) = @_;
626         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
627     my $now = time();
628     utime $now, $now, $s->[$FILENAME];
629 }
630
631 sub ref {
632         my $s = shift;
633         return $s if defined $s->[$TIE_HASH];
634         return $s->import_db();
635 }
636
637 sub sort_each {
638         my($s, $sort_field, $sort_option) = @_;
639         if(length $sort_field) {
640                 my $opt = {};
641                 $opt->{to} = $sort_option
642                         if $sort_option;
643                 $opt->{ml} = 99999;
644                 $opt->{st} = 'db';
645                 $opt->{tf} = $sort_field;
646                 $opt->{query} = "select * from $s->[$CONFIG]{name}";
647                 $s->[$EACH] = $s->query($opt);
648                 return;
649         }
650 }
651
652 sub each_sorted {
653         my $s = shift;
654         if(! defined $s->[$EACH][0]) {
655                 undef $s->[$EACH];
656                 return ();
657         }
658         my $k = $s->[$EACH][0][$s->[$KEY_INDEX]];
659         return ($k, @{shift @{ $s->[$EACH] } });
660 }
661
662 sub each_record {
663     my ($s) = @_;
664         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
665     my $key;
666
667         return $s->each_sorted() if defined $s->[$EACH];
668     for (;;) {
669         $key = each %{$s->[$TIE_HASH]};
670         if (defined $key) {
671             if ($key =~ s/^k//) {
672                 return ($key, $s->row($key));
673             }
674         }
675         else {
676             return ();
677         }
678     }
679 }
680
681 my $sup;
682 my $restrict;
683 my $rfield;
684 my $hfield;
685 my $rsession;
686
687 sub each_nokey {
688     my ($s, $qual) = @_;
689         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
690     my ($key, $hf);
691
692         if (! defined $restrict) {
693                 # Support hide_field
694                 if($qual) {
695 #::logDebug("Found qual=$qual");
696                         $hfield = $qual;
697                         if($hfield =~ s/^\s+WHERE\s+(\w+)\s*!=\s*1($|\s+)//) {
698                                 $hf = $1;
699 #::logDebug("Found hf=$hf");
700                                 $s->test_column($hf) and $hfield = $s->column_index($hf);
701                         }
702                         else {
703                                 undef $hfield;
704                         }
705
706 #::logDebug("hf index=$hfield");
707                 }
708                 if($restrict = ($Vend::Cfg->{TableRestrict}{$s->config('name')} || 0)) {
709 #::logDebug("restricted?");
710                 $sup =  ! defined $Global::SuperUserFunction
711                                         ||
712                                 $Global::SuperUserFunction->();
713                 if($sup) {
714                         $restrict = 0;
715                 }
716                 else {
717                         ($rfield, $rsession) = split /\s*=\s*/, $restrict;
718                         $s->test_column($rfield) and $rfield = $s->column_index($rfield)
719                                 or $restrict = 0;
720                         $rsession = $Vend::Session->{$rsession};
721                 }
722         }
723
724                 $restrict = 1 if $hfield and $s->[$CONFIG]{HIDE_FIELD} eq $hf;
725
726         }
727
728     for (;;) {
729         $key = each %{$s->[$TIE_HASH]};
730 #::logDebug("each_nokey: $key field=$rfield sup=$sup");
731                 if(! defined $key) {
732                         undef $restrict;
733                         return ();
734                 }
735                 $key =~ s/^k// or next;
736                 if($restrict) {
737                         my (@row) = $s->row($key);
738 #::logDebug("each_nokey: rfield='$row[$rfield]' eq '$rsession' ??") if defined $rfield;
739 #::logDebug("each_nokey: hfield='$row[$hfield]'") if defined $hfield;
740                         next if defined $hfield and $row[$hfield];
741                         next if defined $rfield and $row[$rfield] ne $rsession;
742                         return \@row;
743                 }
744                 return [ $s->row($key) ];
745     }
746 }
747
748 sub suicide { 1 }
749
750 sub isopen {
751         return defined $_[0]->[$TIE_HASH];
752 }
753
754 sub delete_record {
755     my ($s, $key) = @_;
756         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
757     if($s->[$CONFIG]{Read_only}) {
758                 $s->log_error(
759                         "Attempt to delete row '$key' in read-only table %s",
760                         $key,
761                         $s->[$CONFIG]{name},
762                 );
763                 return undef;
764         }
765
766 #::logDebug("delete row $key from $s->[$FILENAME]");
767     delete $s->[$TIE_HASH]{"k$key"};
768         1;
769 }
770
771 sub sprintf_substitute {
772         my ($s, $query, $fields, $cols) = @_;
773         return sprintf $query, @$fields;
774 }
775
776 sub hash_query {
777         my ($s, $query, $opt) = @_;
778         $opt ||= {};
779         $opt->{query} = $query;
780         $opt->{hashref} = 1;
781         return scalar $s->query($opt);
782 }
783
784 sub query {
785     my($s, $opt, $text, @arg) = @_;
786
787     if(! CORE::ref($opt)) {
788         unshift @arg, $text if defined $text;
789         $text = $opt;
790         $opt = {};
791     }
792
793         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
794         $opt->{query} = $opt->{sql} || $text if ! $opt->{query};
795
796 #::logDebug("receieved query. object=" . ::uneval_it($opt));
797
798         if(defined $opt->{values}) {
799                 @arg = $opt->{values} =~ /['"]/
800                                 ? ( Text::ParseWords::shellwords($opt->{values})  )
801                                 : (grep /\S/, split /\s+/, $opt->{values});
802                 @arg = @{$::Values}{@arg};
803         }
804
805         if($opt->{type}) {
806                 $opt->{$opt->{type}} = 1 unless defined $opt->{$opt->{type}};
807         }
808
809         my $query;
810     $query = ! scalar @arg
811                         ? $opt->{query}
812                         : sprintf_substitute ($s, $opt->{query}, \@arg);
813
814         my $codename = defined $s->[$CONFIG]{KEY} ? $s->[$CONFIG]{KEY} : 'code';
815         my $ref;
816         my $relocate;
817         my $return;
818         my $spec;
819         my $stmt;
820         my $update = '';
821         my %nh;
822         my @na;
823         my @update_fields;
824         my @out;
825
826         if($opt->{STATEMENT}) {
827                  $stmt = $opt->{STATEMENT};
828                  $spec = $opt->{SPEC};
829 #::logDebug('rerouted. Command is ' . $stmt->command());
830         }
831         else {
832                 eval {
833                         ($spec, $stmt) = Vend::Scan::sql_statement($query, $opt);
834                 };
835                 if($@) {
836                         my $msg = errmsg("SQL query failed: %s\nquery was: %s", $@, $query);
837                         $s->log_error($msg);
838                         Carp::croak($msg) if $Vend::Try;
839                         return ($opt->{failure} || undef);
840                 }
841                 my @additions = grep length($_) == 2, keys %$opt;
842                 for(@additions) {
843                         next unless length $opt->{$_};
844                         $spec->{$_} = $opt->{$_};
845                 }
846         }
847         my @tabs = @{$spec->{rt} || $spec->{fi}};
848
849         my $reroute;
850         my $tname = $s->[$CONFIG]{name};
851         if ($tabs[0] ne $tname) {
852                 if("$tabs[0]_txt" eq $tname or "$tabs[0]_asc" eq $tname) {
853                         $tabs[0] = $spec->{fi}[0] = $tname;
854                 }
855                 else {
856                         $reroute = 1;
857                 }
858         }
859
860         if($reroute) {
861                 unless ($reroute = $Vend::Database{$tabs[0]}) {
862                         $s->log_error("Table %s not found in databases", $tabs[0]);
863                         return $opt->{failure} || undef;
864                 }
865                 $s = $reroute;
866 #::logDebug("rerouting to $tabs[0]");
867                 $opt->{STATEMENT} = $stmt;
868                 $opt->{SPEC} = $spec;
869                 return $s->query($opt, $text);
870         }
871
872 eval {
873
874         my @vals;
875         if($stmt->command() ne 'SELECT') {
876                 if(defined $s and $s->[$CONFIG]{Read_only}) {
877                         $s->log_error(
878                                         "Attempt to write read-only table %s",
879                                         $s->[$CONFIG]{name},
880                         );
881                         return undef;
882                 }
883                 $update = $stmt->command();
884                 @vals = $stmt->row_values();
885 #::logDebug("row_values returned=" . ::uneval(\@vals));
886         }
887
888
889         @na = @{$spec->{rf}}     if $spec->{rf};
890
891 #::logDebug("spec->{ml}=$spec->{ml} opt->{ml}=$opt->{ml}");
892         $spec->{ml} = $opt->{ml} if $opt->{ml};
893         $spec->{ml} ||= '1000';
894         $spec->{fn} = [$s->columns];
895
896         my $sub;
897
898         if($update eq 'INSERT') {
899                 if(! $spec->{rf} or  $spec->{rf}[0] eq '*') {
900                         @update_fields = @{$spec->{fn}};
901                 }
902                 else {
903                         @update_fields = @{$spec->{rf}};
904                 }
905 #::logDebug("update fields: " . uneval(\@update_fields));
906                 @na = $codename;
907                 $sub = $s->row_settor(@update_fields);
908         }
909         elsif($update eq 'UPDATE') {
910                 @update_fields = @{$spec->{rf}};
911 #::logDebug("update fields: " . uneval(\@update_fields));
912                 my $key = $s->config('KEY');
913                 @na = ($codename);
914                 $sub = sub {
915                                         my $key = shift;
916                                         $s->set_slice($key, [@update_fields], \@_);
917                                 };
918         }
919         elsif($update eq 'DELETE') {
920                 @na = $codename;
921                 $sub = sub { delete_record($s, @_) };
922         }
923         else {
924                 @na = @{$spec->{fn}}   if ! scalar(@na) || $na[0] eq '*';
925         }
926
927         $spec->{rf} = [@na];
928
929 #::logDebug("tabs='@tabs' columns='@na' vals='@vals' uf=@update_fields update=$update"); 
930
931     my $search;
932     if (! defined $opt->{st} or "\L$opt->{st}" eq 'db' ) {
933                 for(@tabs) {
934                         s/\..*//;
935                 }
936         $search = new Vend::DbSearch;
937 #::logDebug("created DbSearch object: " . ::uneval_it($search));
938         }
939         else {
940         $search = new Vend::TextSearch;
941 #::logDebug("created TextSearch object: " . ::uneval_it($search));
942     }
943
944         my %fh;
945         my $i = 0;
946         %nh = map { (lc $_, $i++) } @na;
947         $i = 0;
948         %fh = map { ($_, $i++) } @{$spec->{fn}};
949
950 #::logDebug("field hash: " . Vend::Util::uneval_it(\%fh)); 
951         for ( qw/rf sf/ ) {
952                 next unless defined $spec->{$_};
953                 map { $_ = $fh{$_} } @{$spec->{$_}};
954         }
955
956         if($update) {
957                 $opt->{row_count} = 1;
958                 die "Reached update query without object"
959                         if ! $s;
960 #::logDebug("Update operation is $update, sub=$sub");
961                 die "Bad row settor for columns @na"
962                         if ! $sub;
963                 if($update eq 'INSERT') {
964                         $sub->(@vals);
965                         $ref = [[ $vals[0] ]];
966                 }
967                 else {
968                         $ref = $search->array($spec);
969                         for(@$ref) {
970 #::logDebug("returned =" . uneval($_) . ", update values: " . uneval(\@vals));
971                                 $sub->($_->[0], @vals);
972                         }
973                 }
974         }
975         elsif ($opt->{hashref}) {
976                 $ref = $Vend::Interpolate::Tmp->{$opt->{hashref}} = $search->hash($spec);
977         }
978         else {
979 #::logDebug(    " \$Vend::Interpolate::Tmp->{$opt->{arrayref}}");
980                 $ref = $Vend::Interpolate::Tmp->{$opt->{arrayref} || ''}
981                          = $search->array($spec);
982                 $opt->{object} = $search;
983                 $opt->{prefix} = 'sql' unless defined $opt->{prefix};
984         }
985 };
986 #::logDebug("search spec: " . Vend::Util::uneval($spec));
987 #::logDebug("name hash: " . Vend::Util::uneval(\%nh));
988 #::logDebug("ref returned: " . substr(Vend::Util::uneval($ref), 0, 100));
989 #::logDebug("opt is: " . Vend::Util::uneval($opt));
990         if($@) {
991                 $s->log_error(
992                                 "MVSQL query failed for %s: %s\nquery was: %s",
993                                 $opt->{table},
994                                 $@,
995                                 $query,
996                         );
997                 $return = $opt->{failure} || undef;
998         }
999
1000         if($opt->{search_label}) {
1001                 $::Instance->{SearchObject}{$opt->{search_label}} = {
1002                         mv_results => $ref,
1003                         mv_field_names => \@na,
1004                 };
1005         }
1006
1007         if ($opt->{row_count}) {
1008                 my $rc = $ref ? scalar @$ref : 0;
1009                 return $rc unless $opt->{list};
1010                 $ref = [ [ $rc ] ];
1011                 @na = [ 'row_count' ];
1012                 %nh = ( 'rc' => 0, 'count' => 0, 'row_count' => 0 );
1013         }
1014
1015         return Vend::Interpolate::tag_sql_list($text, $ref, \%nh, $opt, \@na)
1016                 if $opt->{list};
1017         return Vend::Interpolate::html_table($opt, $ref, \@na)
1018                 if $opt->{html};
1019         return Vend::Util::uneval($ref)
1020                 if $opt->{textref};
1021         return wantarray ? ($ref, \%nh, \@na) : $ref;
1022 }
1023
1024 *import_quoted = *import_csv = \&import_ascii_delimited;
1025
1026 my %Sort = (
1027
1028     ''  => sub { $a cmp $b              },
1029     none    => sub { $a cmp $b              },
1030     f   => sub { (lc $a) cmp (lc $b)    },
1031     fr  => sub { (lc $b) cmp (lc $a)    },
1032     n   => sub { $a <=> $b              },
1033     nr  => sub { $b <=> $a              },
1034     r   => sub { $b cmp $a              },
1035     rf  => sub { (lc $b) cmp (lc $a)    },
1036     rn  => sub { $b <=> $a              },
1037 );
1038
1039 my $fafh;
1040 sub file_access {
1041         my $function = shift;
1042         return <$fafh> 
1043 }
1044
1045 sub import_ascii_delimited {
1046     my ($infile, $options, $table_name) = @_;
1047         my ($format, $csv);
1048
1049         my $delimiter = quotemeta($options->{'delimiter'});
1050
1051         if  ($delimiter eq 'CSV') {
1052                 $csv = 1;
1053                 $format = 'CSV';
1054         }
1055         elsif ($options->{CONTINUE}) {
1056                 $format = uc $options->{CONTINUE};
1057         }
1058         else {
1059                 $format = 'NONE';
1060         }
1061
1062         my $realfile;
1063         if($options->{PRELOAD}) {
1064                 # do not preload if $infile is a scalar reference
1065                 if ($options->{scalar_ref} or 
1066                         (-f $infile and $options->{PRELOAD_EMPTY_ONLY})) {
1067                         # Do nothing, no preload
1068                 }
1069                 else {
1070                         $realfile = -f $infile ? $infile : '';
1071                         $infile = $options->{PRELOAD};
1072                         $infile = "$Global::VendRoot/$infile" if ! -f $infile;
1073                         ($infile = $realfile, undef $realfile) if ! -f $infile;
1074                 }
1075         }
1076
1077         if(! defined $realfile) {
1078                 if($options->{scalar_ref}){
1079                         open(IN, '+<', $infile)
1080                                 or die errmsg("%s %s: %s\n", errmsg("open scalar reference"), *$infile, $!);
1081                         # locking of scalar reference filehandles in unsupported
1082                 }
1083                 else{
1084                         open(IN, "+<$infile")
1085                                 or die errmsg("%s %s: %s\n", errmsg("open read/write"), $infile, $!);
1086                         lockfile(\*IN, 1, 1)
1087                                 or die errmsg("%s %s: %s\n", errmsg("lock"), $infile, $!);
1088                 }
1089
1090         }
1091         else {
1092                 open(IN, "<$infile")
1093                         or die errmsg("%s %s: %s\n", errmsg("open"), $infile, $!);
1094         }
1095
1096         new_filehandle(\*IN);
1097
1098         # we should be inputting as UTF8 if we're so configured
1099         binmode(\*IN, ':utf8') if $::Variable->{MV_UTF8} || $Global::Variable->{MV_UTF8};
1100
1101         my $field_hash;
1102         my $para_sep;
1103         my $codere = '[\w-_#/.]+';
1104         my $idx = 0;
1105
1106         my($field_count, @field_names);
1107         
1108         if($options->{hs}) {
1109                 my $i = 0;
1110                 <IN> while $i++ < $options->{hs};
1111         }
1112         if($options->{field_names}) {
1113                 @field_names = @{$options->{field_names}};
1114
1115                 # This pulls COLUMN_DEF out of a field name
1116                 # remains in ASCII file, though
1117                 separate_definitions($options,\@field_names);
1118
1119                 if($options->{CONTINUE} eq 'NOTES') {
1120                         $para_sep = $options->{NOTES_SEPARATOR} ||$options->{SEPARATOR} || "\f";
1121                         $field_hash = {};
1122                         for(@field_names) {
1123                                 $field_hash->{$_} = $idx++;
1124                         }
1125                         $idx = $#field_names;
1126                 }
1127         }
1128         else {
1129                 my $field_names;
1130                 if ($csv) {
1131                         @field_names = read_quoted_fields(\*IN);
1132                 }
1133                 else {
1134                         $field_names = <IN>;
1135                         chomp $field_names;
1136                         $field_names =~ s/\s+$// unless $format eq 'NOTES';
1137                         @field_names = split(/$delimiter/, $field_names);
1138                 }
1139
1140                 # This pulls COLUMN_DEF out of a field name
1141                 # remains in ASCII file, though
1142                 separate_definitions($options,\@field_names);
1143
1144 #::logDebug("field names: @field_names");
1145                 if($format eq 'NOTES') {
1146                         $field_hash = {};
1147                         for(@field_names) {
1148                                 s/:.*//;        
1149                                 if(/\S[ \t]+/) {
1150                                         die "Only one notes field allowed in NOTES format.\n"
1151                                                 if $para_sep;
1152                                         $para_sep = $_;
1153                                         $_ = '';
1154                                 }
1155                                 else {
1156                                         $field_hash->{$_} = $idx++;
1157                                 }
1158                         }
1159                         my $msg;
1160                         @field_names = grep $_, @field_names;
1161                         $para_sep =~ s/($codere)[\t ]*(.)/$2/;
1162                         push(@field_names, ($1 || 'notes_field'));
1163                         $idx = $#field_names;
1164                         $para_sep = $options->{NOTES_SEPARATOR} || "\f";
1165                 }
1166         }
1167
1168         local($/) = "\n" . $para_sep ."\n"
1169                 if $para_sep;
1170
1171         $field_count = scalar @field_names;
1172
1173         no strict 'refs';
1174     my $out;
1175         if($options->{ObjectType}) {
1176                 $out = &{"$options->{ObjectType}::create"}(
1177                                                                         $options->{ObjectType},
1178                                                                         $options,
1179                                                                         \@field_names,
1180                                                                         $table_name,
1181                                                                 );
1182         }
1183         else {
1184                 $out = $options->{Object};
1185         }
1186
1187         if(! $out) {
1188                 die errmsg(q{No database object for table: %s
1189
1190 Probable mismatch of Database directive to database type,
1191 for example calling DBI without proper modules or database
1192 access.
1193 },
1194                                         $table_name,
1195                                         );
1196         }
1197         my $fields;
1198     my (@fields, $key);
1199         my @addl;
1200         my $excel = '';
1201         my $excel_addl = '';
1202
1203         if($options->{EXCEL}) {
1204         #Fix for quoted includes supplied by Larry Lesczynski
1205                 $excel = <<'EndOfExcel';
1206                         if(/"[^\t]*(?:,|"")/) {
1207                                 for (@fields) {
1208                                         next unless /[,"]/;
1209                                         s/^"//;
1210                                         s/"$//;
1211                                         s/""/"/g;
1212                                 }
1213                         }
1214 EndOfExcel
1215                 $excel_addl = <<'EndOfExcel';
1216                         if(/"[^\t]*(?:,|"")/) {
1217                                 for (@addl) {
1218                                         next unless /,/;
1219                                         s/^"//;
1220                                         s/"$//;
1221                                 }
1222                         }
1223 EndOfExcel
1224         }
1225         
1226         my $index = '';
1227         my @fh; # Array of file handles for sort
1228         my @fc; # Array of file handles for copy when symlink fails
1229         my @i;  # Array of field names for sort
1230         my @o;  # Array of sort options
1231         my %comma;
1232         if($options->{INDEX} and ! $options->{NO_ASCII_INDEX}) {
1233                 my @f; my $f;
1234                 my @n;
1235                 my $i;
1236                 @f = @{$options->{INDEX}};
1237                 foreach $f (@f) {
1238                         my $found = 0;
1239                         $i = 0;
1240                         if( $f =~ s/:(.*)//) {
1241                                 my $option = $1;
1242                                 push @o, $1;
1243                         }
1244                         elsif (exists $options->{INDEX_OPTIONS}{$f}) {
1245
1246                                 push @o, $options->{INDEX_OPTIONS}{$f};
1247                         }
1248                         else {
1249                                 push @o, '';
1250                         }
1251                         for(@field_names) {
1252                                 if($_ eq $f) {
1253                                         $found++;
1254                                         push(@i, $i);
1255                                         push(@n, $f);
1256                                         last;
1257                                 }
1258                                 $i++;
1259                         }
1260                         (pop(@o), next) unless $found;
1261                 }
1262                 if(@i) {
1263                         require IO::File;
1264                         my $fh;
1265                         my $f_string = join ",", @i;
1266                         @f = ();
1267                         for($i = 0; $i < @i; $i++) {
1268                                 my $fnum = $i[$i];
1269                                 $fh = new IO::File "> $infile.$i[$i]";
1270                                 die errmsg("%s %s: %s\n", errmsg("create"), "$infile.$i[$i]",
1271                                 $!) unless defined $fh;
1272
1273                                 new_filehandle($fh);
1274
1275                                 eval {
1276                                         unlink "$infile.$n[$i]" if -l "$infile.$n[$i]";
1277                                         symlink "$infile.$i[$i]", "$infile.$n[$i]";
1278                                 };
1279                                 push @fc, ["$infile.$i[$i]", "$infile.$n[$i]"]
1280                                         if $@;
1281                                 push @fh, $fh;
1282                                 if($o[$i] =~ s/c//) {
1283                                         $index .= <<EndOfIndex;
1284                         map { print { \$fh[$i] } "\$_\\t\$fields[0]\\n" } split /\\s*,\\s*/, \$fields[$fnum];
1285 EndOfIndex
1286                                 }
1287                                 elsif($o[$i] =~ s/s//) {
1288                                         $index .= <<EndOfIndex;
1289                         map { print { \$fh[$i] } "\$_\\t\$fields[0]\\n" } split /\\s*;\\s*/, \$fields[$fnum];
1290 EndOfIndex
1291                                 }
1292                                 else {
1293                                         $index .= <<EndOfIndex;
1294                         print { \$fh[$i] } "\$fields[$fnum]\\t\$fields[0]\\n";
1295 EndOfIndex
1296                                 }
1297                         }
1298                 }
1299         }
1300
1301         my $numeric_guess = '';
1302         my $numeric_clean = '';
1303         my %non_numeric;
1304         my @empty;
1305         my @possible;
1306         my $clean;
1307
1308         if($options->{GUESS_NUMERIC} and $options->{type} ne '8') {
1309                 @possible = (0 .. $#field_names);
1310                 @empty = map { 1 } (0 .. $#field_names);
1311                 
1312                 $numeric_guess = <<'EOF';
1313                         for (@possible) {
1314                                 ($empty[$_] = 0, next) if $fields[$_] =~ /^-?\d+\.?\d*$/;
1315                                 next if $empty[$_] && ! length($fields[$_]);
1316                                 $empty[$_] = undef;
1317                                 $clean = 1;
1318                                 $non_numeric{$_} = 1;
1319                         }
1320 EOF
1321                 $numeric_clean = <<'EOF';
1322                         next unless $clean;
1323                         undef $clean;
1324                         @possible = grep ! $non_numeric{$_}, @possible;
1325                         %non_numeric = ();
1326 EOF
1327         }
1328
1329 my %format = (
1330
1331         NOTES => <<EndOfRoutine,
1332         while (<IN>) {
1333             chomp;
1334                         \@fields = ();
1335                         s/\\r?\\n\\r?\\n((?s:.)*)//
1336                                 and \$fields[$idx] = \$1;
1337
1338                         while(s!($codere):[ \\t]*(.*)\\n?!!) {
1339                                 next unless defined \$field_hash->{\$1};
1340                                 \$fields[\$field_hash->{\$1}] = \$2;
1341                         }
1342                         $index
1343                         $numeric_guess
1344             \$out->set_row(\@fields);
1345                         $numeric_clean
1346         }
1347 EndOfRoutine
1348
1349         LINE => <<EndOfRoutine,
1350         while (<IN>) {
1351             chomp;
1352                         \$fields = \@fields = split(/$delimiter/, \$_, $field_count);
1353                         $index
1354                         push (\@fields, '') until \$fields++ >= $field_count;
1355                         $numeric_guess
1356             \$out->set_row(\@fields);
1357                         $numeric_clean
1358         }
1359 EndOfRoutine
1360
1361         CSV => <<EndOfRoutine,
1362                 while (\@fields = read_quoted_fields(\\*IN)) {
1363             \$fields = scalar \@fields;
1364                         $index
1365             push (\@fields, '') until \$fields++ >= $field_count;
1366                         $numeric_guess
1367             \$out->set_row(\@fields);
1368                         $numeric_clean
1369         }
1370 EndOfRoutine
1371
1372         NONE => <<EndOfRoutine,
1373         while (<IN>) {
1374             chomp;
1375             \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1376                         $excel
1377                         $index
1378             push (\@fields, '') until \$fields++ >= $field_count;
1379                         $numeric_guess
1380             \$out->set_row(\@fields);
1381                         $numeric_clean
1382         }
1383 EndOfRoutine
1384
1385         UNIX => <<EndOfRoutine,
1386         while (<IN>) {
1387             chomp;
1388                         if(s/\\\\\$//) {
1389                                 \$_ .= <IN>;
1390                                 redo;
1391                         }
1392                         elsif (s/<<(\\w+)\$//) {
1393                                 my \$mark = \$1;
1394                                 my \$line = \$_;
1395                                 \$line .= Vend::Config::read_here(\\*IN, \$mark);
1396                                 \$_ = \$line;
1397                                 redo;
1398                         }
1399
1400             \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1401                         $excel
1402                         $index
1403             push (\@fields, '') until \$fields++ >= $field_count;
1404                         $numeric_guess
1405             \$out->set_row(\@fields);
1406                         $numeric_clean
1407         }
1408 EndOfRoutine
1409
1410         DITTO => <<EndOfRoutine,
1411         while (<IN>) {
1412             chomp;
1413                         if(/^$delimiter/) {
1414                                 \$fields = \@addl = split /$delimiter/, \$_, 99999;
1415                                 shift \@addl;
1416                                 $excel_addl
1417                                 my \$i;
1418                                 for(\$i = 0; \$i < \@addl; \$i++) {
1419                                         \$fields[\$i] .= "\n\$addl[\$i]"
1420                                                 if \$addl[\$i] ne '';
1421                                 }
1422                         }
1423                         else {
1424                                 \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1425                                 $excel
1426                                 $index
1427                                 push (\@fields, '') until \$fields++ >= $field_count;
1428                         }
1429                         $numeric_guess
1430             \$out->set_row(\@fields);
1431                         $numeric_clean
1432         }
1433 EndOfRoutine
1434
1435 );
1436
1437     eval $format{$format};
1438         die errmsg("%s import into %s failed: %s", $options->{name}, $options->{table}, $@) if $@;
1439     if($realfile) {
1440                 close IN
1441                         or die errmsg("%s %s: %s\n", errmsg("close"), $infile, $!);
1442                 if(-f $realfile) {
1443                         open(IN, "+<$realfile")
1444                                 or die
1445                                         errmsg("%s %s: %s\n", errmsg("open read/write"), $realfile, $!);
1446                         lockfile(\*IN, 1, 1)
1447                                 or die errmsg("%s %s: %s\n", errmsg("lock"), $realfile, $!);
1448                         new_filehandle(\*IN);
1449                         <IN>;
1450                         eval $format{$format};
1451                         die errmsg("%s %s: %s\n", errmsg("import"), $options->{name}, $!) if $@;
1452                 }
1453                 elsif (! open(IN, ">$realfile") && new_filehandle(\*IN) ) {
1454                                 die errmsg("%s %s: %s\n", errmsg("create"), $realfile, $!);
1455                 } 
1456                 else {
1457                         print IN join($options->{DELIMITER}, @field_names);
1458                         print IN $/;
1459                         close IN;
1460                 }
1461         }
1462         if(@fh) {
1463                 my $no_sort;
1464                 my $sort_sub;
1465                 my $ftest = Vend::Util::catfile($Vend::Cfg->{ScratchDir}, 'sort.test');
1466                 my $cmd = "echo you_have_no_sort_but_we_will_cope | sort -f -n -o $ftest";
1467                 system $cmd;
1468                 $no_sort = 1 if ! -f $ftest;
1469                 
1470                 my $fh;
1471                 my $i;
1472                 for ($i = 0; $i < @fh; $i++) {
1473                         close $fh[$i] or die "close: $!";
1474                         unless ($no_sort) {
1475                                 $o[$i] = "-$o[$i]" if $o[$i];
1476                                 $cmd = "sort $o[$i] -o $infile.$i[$i] $infile.$i[$i]";
1477                                 system $cmd;
1478                         }
1479                         else {
1480                                 $fh = new IO::File "$infile.$i[$i]";
1481                                 new_filehandle($fh);
1482                                 my (@lines) = <$fh>;
1483                                 close $fh or die "close: $!";
1484                                 my $option = $o[$i] || 'none';
1485                                 @lines = sort { &{$Sort{$option}} } @lines;
1486                                 $fh = new IO::File ">$infile.$i[$i]";
1487                                 new_filehandle($fh);
1488                                 print $fh @lines;
1489                                 close $fh or die "close: $!";
1490                         }
1491                 }
1492         }
1493         if(@fc) {
1494                 require File::Copy;
1495                 for(@fc) {
1496                         File::Copy::copy(@{$_});
1497                 }
1498         }
1499
1500         unless($options->{no_commit}) {
1501                 $out->commit() if $out->config('HAS_TRANSACTIONS');
1502         }
1503         delete $out->[$CONFIG]{Clean_start};
1504         delete $out->[$CONFIG]{_Dirty};
1505         unless($options->{scalar_ref}){
1506                 unlockfile(\*IN) or die "unlock\n";
1507         }
1508     close(IN);
1509         my $dot = $out->[$CONFIG]{HIDE_AUTO_FILES} ? '.' : '';
1510         if($numeric_guess) {
1511                 my $fn = Vend::Util::catfile($out->[$CONFIG]{DIR}, "$dot$out->[$CONFIG]{file}");
1512                 Vend::Util::writefile(
1513                                         ">$fn.numeric",
1514                                         join " ", map { $field_names[$_] } @possible,
1515                 );
1516         }
1517     return $out;
1518 }
1519
1520 sub import_from_ic_db {
1521     my ($infile, $options, $table_name) = @_;
1522
1523         my $tname = $options->{MIRROR}
1524                 or die errmsg(
1525                                 "Memory mirror table not specified for table %s.",
1526                                 $table_name,
1527                         );
1528 #::logDebug("Importing mirrored $table_name from $tname");
1529
1530         $Vend::Database{$tname} =
1531                 Vend::Data::import_database($Vend::Cfg->{Database}{$tname})
1532                         unless $Vend::Database{$tname};
1533
1534         my $idb = Vend::Data::database_exists_ref($tname)
1535                 or die errmsg(
1536                                 "Memory mirror table %s does not exist (yet) to create mirror %s.\n",
1537                                 $tname,
1538                                 $table_name,
1539                         );
1540
1541         my @field_names = $idb->columns;
1542
1543         my $odb;
1544
1545         if($options->{ObjectType}) {
1546                 no strict 'refs';
1547                 $odb = &{"$options->{ObjectType}::create"}(
1548                                                                         $options->{ObjectType},
1549                                                                         $options,
1550                                                                         \@field_names,
1551                                                                         $table_name,
1552                                                                 );
1553         }
1554         else {
1555                 $odb = $options->{Object};
1556         }
1557
1558 #::logDebug("idb=$idb odb=$odb");
1559         eval {
1560                 my $f;
1561                 while($f = $idb->each_nokey($options->{MIRROR_QUAL})) {
1562 #::logDebug("importing key=$f->[0]");
1563                         $odb->set_row(@$f);
1564                 }
1565         };
1566
1567         if($@) {
1568                 die errmsg(
1569                                 "Problem with mirror import from source %s to target %s\n",
1570                                 $tname,
1571                                 $table_name,
1572                                 );
1573         }
1574         
1575         $odb->[$CONFIG]{Mirror_complete} = 1;
1576         delete $odb->[$CONFIG]{Clean_start};
1577     return $odb;
1578 }
1579
1580 my $white = ' \t';
1581
1582 sub read_quoted_fields {
1583     my ($filehandle) = @_;
1584     local ($_, $.);
1585     while(<$filehandle>) {
1586         s/[\r\n\cZ]+$//g;           # ms-dos cruft
1587         next if m/^[$white]*$/o;     # skip blank lines
1588         my @f = parse($_, $.);
1589 #::logDebug("read: '" . join("','", @f) . "'");
1590         return parse($_, $.);
1591     }
1592     return ();
1593 }
1594
1595 sub parse {
1596     local $_ = $_[0];
1597     my $linenum = $_[1];
1598
1599     my $expect = 1;
1600     my @a = ();
1601     my $x;
1602     while ($_ ne '') {
1603         if    (m# \A ([$white]+) (.*) #ox) { }
1604         elsif (m# \A (,[$white]*) (.*) #ox) {
1605             push @a, '' if $expect;
1606             $expect = 1;
1607         }
1608         elsif (m# \A ([^",$white] (?:[$white]* [^,$white]+)*) (.*) #ox) {
1609             push @a, $1;
1610             $expect = 0;
1611         }
1612         elsif (m# \A " ((?:[^"] | (?:""))*) " (?!") (.*) #x) {
1613             ($x = $1) =~ s/""/"/g;
1614             push @a, $x;
1615             $expect = 0;
1616         }
1617         elsif (m# \A " #x) {
1618             die "Unterminated quote at line $linenum\n";
1619         }
1620         else { die "Can't happen: '$_'" }
1621         $_ = $2;
1622     }
1623     $expect and push @a, '';
1624     return @a;
1625 }
1626
1627 sub reset {
1628         undef $restrict;
1629 }
1630
1631 sub errstr {
1632         return shift(@_)->[$CONFIG]{last_error};
1633 }
1634
1635 sub log_error {
1636         my ($s, $tpl, @args) = @_;
1637         if($tpl =~ /^(prepare|execute)$/) {
1638                 if(!@args) {
1639                         $tpl = "Statement $tpl failed: %s";
1640                 }
1641                 elsif (@args == 1) {
1642                         $tpl = "Statement $tpl failed: %s\nQuery was: %s";
1643                 }
1644                 else {
1645                         $tpl = "Statement $tpl failed: %s\nQuery was: %s";
1646                         $tpl .= "\nAdditional: %s" for (2 .. scalar(@args));
1647                 }
1648                 unshift @args, $DBI::errstr;
1649         }
1650         my $msg = errmsg($tpl, @args);
1651         my $ekey = 'table ' . $s->[$CONFIG]{name};
1652         my $cfg = $s->[$CONFIG];
1653         unless(defined $cfg->{LOG_ERROR_CATALOG} and ! $cfg->{LOG_ERROR_CATALOG}) {
1654                 logError($msg);
1655         }
1656         if($cfg->{LOG_ERROR_GLOBAL}) {
1657                 logGlobal($msg);
1658         }
1659         if($Vend::admin or ! defined($cfg->{LOG_ERROR_SESSION}) or $cfg->{LOG_ERROR_SESSION}) {
1660                 $Vend::Session->{errors} = {} unless CORE::ref($Vend::Session->{errors}) eq 'HASH';
1661                 $Vend::Session->{errors}{$ekey} = $msg;
1662         }
1663         die $msg if $cfg->{DIE_ERROR};
1664         return $cfg->{last_error} = $msg;
1665 }
1666
1667 sub new_filehandle {
1668         my $fh = shift;
1669         binmode($fh, ":utf8") if $::Variable->{MV_UTF8} || $Global::Variable->{MV_UTF8};
1670         return $fh;
1671 }
1672
1673 1;
1674
1675 __END__