Correct [log type=error|debug] final newline behavior
[interchange.git] / lib / Vend / Table / Common.pm
1 # Vend::Table::Common - Common access methods for Interchange databases
2 #
3 # $Id: Common.pm,v 2.51 2008-05-26 02:30:04 markj Exp $
4 #
5 # Copyright (C) 2002-2008 Interchange Development Group
6 # Copyright (C) 1996-2002 Red Hat, Inc.
7 #
8 # This program was originally based on Vend 0.2 and 0.3
9 # Copyright 1995 by Andrew M. Wilcox <amw@wilcoxsolutions.com>
10 #
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 2 of the License, or
14 # (at your option) any later version.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public
22 # License along with this program; if not, write to the Free
23 # Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston,
24 # MA  02110-1301  USA.
25
26 $VERSION = substr(q$Revision: 2.51 $, 10);
27 use strict;
28
29 package Vend::Table::Common;
30 require Vend::DbSearch;
31 require Vend::TextSearch;
32 require Vend::CounterFile;
33 no warnings qw(uninitialized numeric);
34 use Symbol;
35 use Vend::Util;
36
37 use Exporter;
38 use vars qw($Storable $VERSION @EXPORT @EXPORT_OK);
39 @EXPORT = qw(create_columns import_ascii_delimited import_csv config columns);
40 @EXPORT_OK = qw(import_quoted read_quoted_fields);
41
42 use vars qw($FILENAME
43                         $COLUMN_NAMES
44                         $COLUMN_INDEX
45                         $KEY_INDEX
46                         $TIE_HASH
47                         $DBM
48                         $EACH
49                         $CONFIG);
50 (
51         $CONFIG,
52         $FILENAME,
53         $COLUMN_NAMES,
54         $COLUMN_INDEX,
55         $KEY_INDEX,
56         $TIE_HASH,
57         $DBM,
58         $EACH,
59         ) = (0 .. 7);
60
61 # See if we can do Storable
62 BEGIN {
63         eval {
64                 die unless $ENV{MINIVEND_STORABLE_DB};
65                 require Storable;
66                 $Storable = 1;
67         };
68 }
69
70 my @Hex_string;
71 {
72     my $i;
73     foreach $i (0..255) {
74         $Hex_string[$i] = sprintf("%%%02X", $i);
75     }
76 }
77
78 sub create_columns {
79         my ($columns, $config) = @_;
80         $config = {} unless $config;
81     my $column_index = {};
82         my $key;
83 #::logDebug("create_columns: " . ::uneval($config));
84
85         if($config->{KEY}) {
86                 $key = $config->{KEY};
87         }
88         elsif (! defined $config->{KEY_INDEX}) {
89                 $config->{KEY_INDEX} = 0;
90                 $config->{KEY} = $columns->[0];
91         }
92     my $i;
93         my $alias = $config->{FIELD_ALIAS} || {};
94 #::logDebug("field_alias: " . ::uneval($alias)) if $config->{FIELD_ALIAS};
95     for ($i = 0;  $i < @$columns;  ++$i) {
96         $column_index->{$columns->[$i]} = $i;
97                 defined $alias->{$columns->[$i]}
98                         and $column_index->{ $alias->{ $columns->[$i] } } = $i;
99                 next unless defined $key and $key eq $columns->[$i];
100                 $config->{KEY_INDEX} = $i;
101                 undef $key;
102 #::logDebug("set KEY_INDEX to $i: " . ::uneval($config));
103     }
104
105     die errmsg(
106                         "Cannot find key column %s in %s (%s): %s",
107                         $config->{KEY},
108                         $config->{name},
109                         $config->{file},
110                         $!,
111             ) unless defined $config->{KEY_INDEX};
112
113         return $column_index;
114 }
115
116 sub separate_definitions {
117         my ($options, $fields) = @_;
118         for(@$fields) {
119 #::logDebug("separating '$_'");
120                 next unless s/\s+(.*)//;
121 #::logDebug("needed separation: '$_'");
122                 my $def = $1;
123                 my $fn = $_;
124                 unless(defined $options->{COLUMN_DEF}{$fn}) {
125                         $options->{COLUMN_DEF}{$fn} = $def;
126                 }
127         }
128         return;
129 }
130
131 sub clear_lock {
132         my $s = shift;
133         return unless $s->[$CONFIG]{IC_LOCKING};
134         if($s->[$CONFIG]{_lock_handle}) {
135                 close $s->[$CONFIG]{_lock_handle};
136                 delete $s->[$CONFIG]{_lock_handle};
137         }
138 }
139
140 sub lock_table {
141         my $s = shift;
142         return unless $s->[$CONFIG]{IC_LOCKING};
143         my $lockhandle;
144         if(not $lockhandle = $s->[$CONFIG]{_lock_handle}) {
145                 my $lf = $s->[$CONFIG]{file} . '.lock';
146                 unless($lf =~ m{/}) {
147                         $lf = ($s->[$CONFIG]{dir} || $Vend::Cfg->{ProductDir}) . "/$lf";
148                 }
149                 $lockhandle = gensym;
150                 $s->[$CONFIG]{_lock_file} = $lf;
151                 $s->[$CONFIG]{_lock_handle} = $lockhandle;
152                 open $lockhandle, ">> $lf"
153                         or die errmsg("Cannot lock table %s (%s): %s", $s->[$CONFIG]{name}, $lf, $!);
154         }
155 #::logDebug("lock handle=$lockhandle");
156         Vend::Util::lockfile($lockhandle);
157 }
158
159 sub unlock_table {
160         my $s = shift;
161         return unless $s->[$CONFIG]{IC_LOCKING};
162         Vend::Util::unlockfile($s->[$CONFIG]{_lock_handle});
163 }
164
165 sub stuff {
166     my ($val) = @_;
167     $val =~ s,([\t\%]),$Hex_string[ord($1)],eg;
168     return $val;
169 }
170
171 sub unstuff {
172     my ($val) = @_;
173     $val =~ s,%(..),chr(hex($1)),eg;
174     return $val;
175 }
176
177 sub autonumber {
178         my $s = shift;
179         my $start;
180         my $cfg = $s->[$CONFIG];
181
182         return $s->autosequence() if $cfg->{AUTO_SEQUENCE};
183
184         return '' if not $start = $cfg->{AUTO_NUMBER};
185         local($/) = "\n";
186         my $c = $s->[$CONFIG];
187         if(! defined $c->{AutoNumberCounter}) {
188                 $c->{AutoNumberCounter} = new Vend::CounterFile
189                                                                         $cfg->{AUTO_NUMBER_FILE},
190                                                                         $start,
191                                                                         $c->{AUTO_NUMBER_DATE},
192                                                                         ;
193         }
194         my $num;
195         do {
196                 $num = $c->{AutoNumberCounter}->inc();
197         } while $s->record_exists($num);
198         return $num;
199 }
200
201 # These don't work in non-DBI databases
202 sub commit   { 1 }
203 sub rollback { 0 }
204
205 sub numeric {
206         return exists $_[0]->[$CONFIG]->{NUMERIC}->{$_[1]};
207 }
208
209 sub quote {
210         my($s, $value, $field) = @_;
211         return $value if $s->numeric($field);
212         $value =~ s/'/\\'/g;
213         return "'$value'";
214 }
215
216 sub config {
217         my ($s, $key, $value) = @_;
218         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
219         return $s->[$CONFIG]{$key} unless defined $value;
220         $s->[$CONFIG]{$key} = $value;
221 }
222
223 sub import_db {
224         my($s) = @_;
225         my $db = Vend::Data::import_database($s->[0], 1);
226         return undef if ! $db;
227         $Vend::Database{$s->[0]{name}} = $db;
228         Vend::Data::update_productbase($s->[0]{name});
229         if($db->[$CONFIG]{export_now}) {
230                 Vend::Data::export_database($db);
231                 delete $db->[$CONFIG]{export_now};
232         }
233         return $db;
234 }
235
236 sub close_table {
237     my ($s) = @_;
238         return 1 if ! defined $s->[$TIE_HASH];
239 #::logDebug("closing table $s->[$FILENAME]");
240         undef $s->[$DBM];
241         $s->clear_lock();
242     untie %{$s->[$TIE_HASH]}
243                 or $s->log_error("%s %s: %s", errmsg("untie"), $s->[$FILENAME], $!);
244         undef $s->[$TIE_HASH];
245 #::logDebug("closed table $s->[$FILENAME], self=" . ::uneval($s));
246 }
247
248 sub filter {
249         my ($s, $ary, $col, $filter) = @_;
250         my $column;
251         for(keys %$filter) {
252                 next unless defined ($column = $col->{$_});
253                 $ary->[$column] = Vend::Interpolate::filter_value(
254                                                                 $filter->{$_},
255                                                                 $ary->[$column],
256                                                                 $_,
257                                                   );
258         }
259 }
260
261 sub columns {
262     my ($s) = @_;
263         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
264     return @{$s->[$COLUMN_NAMES]};
265 }
266
267 sub column_exists {
268         return defined test_column(@_);
269 }
270
271 sub test_column {
272     my ($s, $column) = @_;
273         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
274     return $s->[$COLUMN_INDEX]{$column};
275 }
276
277 sub column_index {
278     my ($s, $column) = @_;
279         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
280     my $i = $s->[$COLUMN_INDEX]{$column};
281     die $s->log_error(
282                                 "There is no column named '%s' in %s",
283                                 $column,
284                                 $s->[$FILENAME],
285                         ) unless defined $i;
286     return $i;
287 }
288
289 *test_record = \&record_exists;
290
291 sub record_exists {
292     my ($s, $key) = @_;
293         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
294     my $r = $s->[$DBM]->EXISTS("k$key");
295     return $r;
296 }
297
298 sub name {
299         my ($s) = shift;
300         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
301         return $s->[$CONFIG]{name};
302 }
303
304 sub row_hash {
305     my ($s, $key) = @_;
306         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
307         return undef unless $s->record_exists($key);
308         my %row;
309     @row{ @{$s->[$COLUMN_NAMES]} } = $s->row($key);
310         return \%row;
311 }
312
313 sub unstuff_row {
314     my ($s, $key) = @_;
315         $s->lock_table() if $s->[$CONFIG]{IC_LOCKING};
316     my $line = $s->[$TIE_HASH]{"k$key"};
317         $s->unlock_table() if $s->[$CONFIG]{IC_LOCKING};
318     die $s->log_error(
319                                         "There is no row with index '%s' in database %s",
320                                         $key,
321                                         $s->[$FILENAME],
322                         ) unless defined $line;
323     return map(unstuff($_), split(/\t/, $line, 9999))
324                 unless $s->[$CONFIG]{FILTER_FROM};
325         my @f = map(unstuff($_), split(/\t/, $line, 9999));
326         $s->filter(\@f, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_FROM});
327         return @f;
328 }
329
330 sub thaw_row {
331     my ($s, $key) = @_;
332         $s->lock_table() if $s->[$CONFIG]{IC_LOCKING};
333     my $line = $s->[$TIE_HASH]{"k$key"};
334         $s->unlock_table() if $s->[$CONFIG]{IC_LOCKING};
335     die $s->log_error( "There is no row with index '%s'", $key,)
336                 unless defined $line;
337     return (@{ Storable::thaw($line) })
338                 unless $s->[$CONFIG]{FILTER_FROM};
339 #::logDebug("filtering.");
340         my $f = Storable::thaw($line);
341         $s->filter($f, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_FROM});
342         return @{$f};
343 }
344
345 sub field_accessor {
346     my ($s, $column) = @_;
347         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
348     my $index = $s->column_index($column);
349     return sub {
350         my ($key) = @_;
351         return ($s->row($key))[$index];
352     };
353 }
354
355 sub row_settor {
356     my ($s, @cols) = @_;
357         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
358         my @index;
359         my $key_idx = $s->[$KEY_INDEX] || 0;
360         #shift(@cols);
361         for(@cols) {
362         push @index, $s->column_index($_);
363         }
364 #::logDebug("settor index=@index");
365     return sub {
366         my (@vals) = @_;
367                 my @row;
368                 my $key = $vals[$key_idx];
369                 eval {
370                         @row = $s->row($key);
371                 };
372         @row[@index] = @vals;
373 #::logDebug("setting $key indices '@index' to '@vals'");
374         $s->set_row(@row);
375     };
376 }
377
378 sub get_slice {
379     my ($s, $key, $fary) = @_;
380         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
381
382         return undef unless $s->record_exists($key);
383
384         if(ref $fary ne 'ARRAY') {
385                 shift; shift;
386                 $fary = [ @_ ];
387         }
388
389         my @result = ($s->row($key))[ map { $s->column_index($_) } @$fary ];
390         return wantarray ? @result : \@result;
391 }
392
393 sub set_slice {
394         my ($s, $key, $fary, $vary) = @_;
395         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
396
397     if($s->[$CONFIG]{Read_only}) {
398                 $s->log_error(
399                         "Attempt to set slice of %s in read-only table %s",
400                         $key,
401                         $s->[$CONFIG]{name},
402                 );
403                 return undef;
404         }
405
406         my $opt;
407         if (ref ($key) eq 'ARRAY') {
408                 $opt = shift @$key;
409                 $key = shift @$key;
410         }
411         $opt = {}
412                 unless ref ($opt) eq 'HASH';
413
414         $opt->{dml} = 'upsert'
415                 unless defined $opt->{dml};
416
417         if(ref $fary ne 'ARRAY') {
418                 my $href = $fary;
419                 if(ref $href ne 'HASH') {
420                         $href = { splice (@_, 2) };
421                 }
422                 $vary = [ values %$href ];
423                 $fary = [ keys   %$href ];
424         }
425
426         my $keyname = $s->[$CONFIG]{KEY};
427
428         my ($found_key) = grep $_ eq $keyname, @$fary;
429
430         if(! $found_key) {
431                 unshift @$fary, $keyname;
432                 unshift @$vary, $key;
433         }
434
435         my @current;
436
437         if ($s->record_exists($key)) {
438                 if ($opt->{dml} eq 'insert') {
439                         $s->log_error(
440                                 "Duplicate key on set_slice insert for key '$key' on table %s",
441                                 $s->[$CONFIG]{name},
442                         );
443                         return undef;
444                 }
445                 @current = $s->row($key);
446         }
447         elsif ($opt->{dml} eq 'update') {
448                 $s->log_error(
449                         "No record to update set_slice for key '$key' on table %s",
450                         $s->[$CONFIG]{name},
451                 );
452                 return undef;
453         }
454
455         @current[ map { $s->column_index($_) } @$fary ] = @$vary;
456
457         $key = $s->set_row(@current);
458         length($key) or
459                 $s->log_error(
460                         "Did set_slice with empty key on table %s",
461                         $s->[$CONFIG]{name},
462                 );
463
464         return $key;
465 }
466
467 sub field_settor {
468     my ($s, $column) = @_;
469         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
470     my $index = $s->column_index($column);
471     return sub {
472         my ($key, $value) = @_;
473         my @row = $s->row($key);
474         $row[$index] = $value;
475         $s->set_row(@row);
476     };
477 }
478
479 sub clone_row {
480         my ($s, $old, $new) = @_;
481         return undef unless $s->record_exists($old);
482         my @ary = $s->row($old);
483         $ary[$s->[$KEY_INDEX]] = $new;
484         $s->set_row(@ary);
485         return $new;
486 }
487
488 sub clone_set {
489         my ($s, $col, $old, $new) = @_;
490         return unless $s->column_exists($col);
491         my $sel = $s->quote($old, $col);
492         my $name = $s->[$CONFIG]{name};
493         my ($ary, $nh, $na) = $s->query("select * from $name where $col = $sel");
494         my $fpos = $nh->{$col} || return undef;
495         $s->config('AUTO_NUMBER', '000001') unless $s->config('AUTO_NUMBER');
496         for(@$ary) {
497                 my $line = $_;
498                 $line->[$s->[$KEY_INDEX]] = '';
499                 $line->[$fpos] = $new;
500                 $s->set_row(@$line);
501         }
502         return $new;
503 }
504
505 sub stuff_row {
506     my ($s, @fields) = @_;
507         my $key = $fields[$s->[$KEY_INDEX]];
508
509 #::logDebug("stuff key=$key");
510         $fields[$s->[$KEY_INDEX]] = $key = $s->autonumber()
511                 if ! length($key);
512         $s->filter(\@fields, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_TO})
513                 if $s->[$CONFIG]{FILTER_TO};
514         $s->lock_table();
515
516     $s->[$TIE_HASH]{"k$key"} = join("\t", map(stuff($_), @fields));
517
518         $s->unlock_table();
519         return $key;
520 }
521
522 sub freeze_row {
523     my ($s, @fields) = @_;
524         my $key = $fields[$s->[$KEY_INDEX]];
525 #::logDebug("freeze key=$key");
526         $fields[$s->[$KEY_INDEX]] = $key = $s->autonumber()
527                 if ! length($key);
528         $s->filter(\@fields, $s->[$COLUMN_INDEX], $s->[$CONFIG]{FILTER_TO})
529                 if $s->[$CONFIG]{FILTER_TO};
530         $s->lock_table();
531         $s->[$TIE_HASH]{"k$key"} = Storable::freeze(\@fields);
532         $s->unlock_table();
533         return $key;
534 }
535
536 if($Storable) {
537         *set_row = \&freeze_row;
538         *row = \&thaw_row;
539 }
540 else {
541         *set_row = \&stuff_row;
542         *row = \&unstuff_row;
543 }
544
545 sub foreign {
546     my ($s, $key, $foreign) = @_;
547         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
548         $key = $s->quote($key, $foreign);
549     my $q = "select $s->[$CONFIG]{KEY} from $s->[$CONFIG]{name} where $foreign = $key";
550 #::logDebug("foreign key query = $q");
551     my $ary = $s->query({ sql => $q });
552 #::logDebug("foreign key query returned" . ::uneval($ary));
553         return undef unless $ary and $ary->[0];
554         return $ary->[0][0];
555 }
556
557 sub field {
558     my ($s, $key, $column) = @_;
559         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
560     return ($s->row($key))[$s->column_index($column)];
561 }
562
563 sub set_field {
564     my ($s, $key, $column, $value) = @_;
565         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
566     if($s->[$CONFIG]{Read_only}) {
567                 $s->log_error(
568                         "Attempt to write %s in read-only table",
569                         "$s->[$CONFIG]{name}::${column}::$key",
570                 );
571                 return undef;
572         }
573     my @row;
574         if($s->record_exists($key)) {
575                 @row = $s->row($key);
576         }
577         else {
578                 $row[$s->[$KEY_INDEX]] = $key;
579         }
580     $row[$s->column_index($column)] = $value
581                 if $column;
582     $s->set_row(@row);
583         $value;
584 }
585
586 sub inc_field {
587     my ($s, $key, $column, $adder) = @_;
588         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
589     my($value);
590     if($s->[$CONFIG]{Read_only}) {
591                 $s->log_error(
592                         "Attempt to write %s in read-only table",
593                         "$s->[$CONFIG]{name}::${column}::$key",
594                 );
595                 return undef;
596         }
597     my @row = $s->row($key);
598         my $idx = $s->column_index($column);
599 #::logDebug("ready to increment key=$key column=$column adder=$adder idx=$idx row=" . ::uneval(\@row));
600     $value = $row[$s->column_index($column)] += $adder;
601     $s->set_row(@row);
602     return $value;
603 }
604
605 sub create_sql {
606     return undef;
607 }
608
609 sub touch {
610     my ($s) = @_;
611         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
612     my $now = time();
613     utime $now, $now, $s->[$FILENAME];
614 }
615
616 sub ref {
617         my $s = shift;
618         return $s if defined $s->[$TIE_HASH];
619         return $s->import_db();
620 }
621
622 sub sort_each {
623         my($s, $sort_field, $sort_option) = @_;
624         if(length $sort_field) {
625                 my $opt = {};
626                 $opt->{to} = $sort_option
627                         if $sort_option;
628                 $opt->{ml} = 99999;
629                 $opt->{st} = 'db';
630                 $opt->{tf} = $sort_field;
631                 $opt->{query} = "select * from $s->[$CONFIG]{name}";
632                 $s->[$EACH] = $s->query($opt);
633                 return;
634         }
635 }
636
637 sub each_sorted {
638         my $s = shift;
639         if(! defined $s->[$EACH][0]) {
640                 undef $s->[$EACH];
641                 return ();
642         }
643         my $k = $s->[$EACH][0][$s->[$KEY_INDEX]];
644         return ($k, @{shift @{ $s->[$EACH] } });
645 }
646
647 sub each_record {
648     my ($s) = @_;
649         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
650     my $key;
651
652         return $s->each_sorted() if defined $s->[$EACH];
653     for (;;) {
654         $key = each %{$s->[$TIE_HASH]};
655         if (defined $key) {
656             if ($key =~ s/^k//) {
657                 return ($key, $s->row($key));
658             }
659         }
660         else {
661             return ();
662         }
663     }
664 }
665
666 my $sup;
667 my $restrict;
668 my $rfield;
669 my $hfield;
670 my $rsession;
671
672 sub each_nokey {
673     my ($s, $qual) = @_;
674         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
675     my ($key, $hf);
676
677         if (! defined $restrict) {
678                 # Support hide_field
679                 if($qual) {
680 #::logDebug("Found qual=$qual");
681                         $hfield = $qual;
682                         if($hfield =~ s/^\s+WHERE\s+(\w+)\s*!=\s*1($|\s+)//) {
683                                 $hf = $1;
684 #::logDebug("Found hf=$hf");
685                                 $s->test_column($hf) and $hfield = $s->column_index($hf);
686                         }
687                         else {
688                                 undef $hfield;
689                         }
690
691 #::logDebug("hf index=$hfield");
692                 }
693                 if($restrict = ($Vend::Cfg->{TableRestrict}{$s->config('name')} || 0)) {
694 #::logDebug("restricted?");
695                 $sup =  ! defined $Global::SuperUserFunction
696                                         ||
697                                 $Global::SuperUserFunction->();
698                 if($sup) {
699                         $restrict = 0;
700                 }
701                 else {
702                         ($rfield, $rsession) = split /\s*=\s*/, $restrict;
703                         $s->test_column($rfield) and $rfield = $s->column_index($rfield)
704                                 or $restrict = 0;
705                         $rsession = $Vend::Session->{$rsession};
706                 }
707         }
708
709                 $restrict = 1 if $hfield and $s->[$CONFIG]{HIDE_FIELD} eq $hf;
710
711         }
712
713     for (;;) {
714         $key = each %{$s->[$TIE_HASH]};
715 #::logDebug("each_nokey: $key field=$rfield sup=$sup");
716                 if(! defined $key) {
717                         undef $restrict;
718                         return ();
719                 }
720                 $key =~ s/^k// or next;
721                 if($restrict) {
722                         my (@row) = $s->row($key);
723 #::logDebug("each_nokey: rfield='$row[$rfield]' eq '$rsession' ??") if defined $rfield;
724 #::logDebug("each_nokey: hfield='$row[$hfield]'") if defined $hfield;
725                         next if defined $hfield and $row[$hfield];
726                         next if defined $rfield and $row[$rfield] ne $rsession;
727                         return \@row;
728                 }
729                 return [ $s->row($key) ];
730     }
731 }
732
733 sub suicide { 1 }
734
735 sub isopen {
736         return defined $_[0]->[$TIE_HASH];
737 }
738
739 sub delete_record {
740     my ($s, $key) = @_;
741         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
742     if($s->[$CONFIG]{Read_only}) {
743                 $s->log_error(
744                         "Attempt to delete row '$key' in read-only table %s",
745                         $key,
746                         $s->[$CONFIG]{name},
747                 );
748                 return undef;
749         }
750
751 #::logDebug("delete row $key from $s->[$FILENAME]");
752     delete $s->[$TIE_HASH]{"k$key"};
753         1;
754 }
755
756 sub sprintf_substitute {
757         my ($s, $query, $fields, $cols) = @_;
758         return sprintf $query, @$fields;
759 }
760
761 sub hash_query {
762         my ($s, $query, $opt) = @_;
763         $opt ||= {};
764         $opt->{query} = $query;
765         $opt->{hashref} = 1;
766         return scalar $s->query($opt);
767 }
768
769 sub query {
770     my($s, $opt, $text, @arg) = @_;
771
772     if(! CORE::ref($opt)) {
773         unshift @arg, $text if defined $text;
774         $text = $opt;
775         $opt = {};
776     }
777
778         $s = $s->import_db() if ! defined $s->[$TIE_HASH];
779         $opt->{query} = $opt->{sql} || $text if ! $opt->{query};
780
781 #::logDebug("receieved query. object=" . ::uneval_it($opt));
782
783         if(defined $opt->{values}) {
784                 @arg = $opt->{values} =~ /['"]/
785                                 ? ( Text::ParseWords::shellwords($opt->{values})  )
786                                 : (grep /\S/, split /\s+/, $opt->{values});
787                 @arg = @{$::Values}{@arg};
788         }
789
790         if($opt->{type}) {
791                 $opt->{$opt->{type}} = 1 unless defined $opt->{$opt->{type}};
792         }
793
794         my $query;
795     $query = ! scalar @arg
796                         ? $opt->{query}
797                         : sprintf_substitute ($s, $opt->{query}, \@arg);
798
799         my $codename = defined $s->[$CONFIG]{KEY} ? $s->[$CONFIG]{KEY} : 'code';
800         my $ref;
801         my $relocate;
802         my $return;
803         my $spec;
804         my $stmt;
805         my $update = '';
806         my %nh;
807         my @na;
808         my @update_fields;
809         my @out;
810
811         if($opt->{STATEMENT}) {
812                  $stmt = $opt->{STATEMENT};
813                  $spec = $opt->{SPEC};
814 #::logDebug('rerouted. Command is ' . $stmt->command());
815         }
816         else {
817                 eval {
818                         ($spec, $stmt) = Vend::Scan::sql_statement($query, $opt);
819                 };
820                 if($@) {
821                         my $msg = errmsg("SQL query failed: %s\nquery was: %s", $@, $query);
822                         $s->log_error($msg);
823                         Carp::croak($msg) if $Vend::Try;
824                         return ($opt->{failure} || undef);
825                 }
826                 my @additions = grep length($_) == 2, keys %$opt;
827                 for(@additions) {
828                         next unless length $opt->{$_};
829                         $spec->{$_} = $opt->{$_};
830                 }
831         }
832         my @tabs = @{$spec->{rt} || $spec->{fi}};
833
834         my $reroute;
835         my $tname = $s->[$CONFIG]{name};
836         if ($tabs[0] ne $tname) {
837                 if("$tabs[0]_txt" eq $tname or "$tabs[0]_asc" eq $tname) {
838                         $tabs[0] = $spec->{fi}[0] = $tname;
839                 }
840                 else {
841                         $reroute = 1;
842                 }
843         }
844
845         if($reroute) {
846                 unless ($reroute = $Vend::Database{$tabs[0]}) {
847                         $s->log_error("Table %s not found in databases", $tabs[0]);
848                         return $opt->{failure} || undef;
849                 }
850                 $s = $reroute;
851 #::logDebug("rerouting to $tabs[0]");
852                 $opt->{STATEMENT} = $stmt;
853                 $opt->{SPEC} = $spec;
854                 return $s->query($opt, $text);
855         }
856
857 eval {
858
859         my @vals;
860         if($stmt->command() ne 'SELECT') {
861                 if(defined $s and $s->[$CONFIG]{Read_only}) {
862                         $s->log_error(
863                                         "Attempt to write read-only table %s",
864                                         $s->[$CONFIG]{name},
865                         );
866                         return undef;
867                 }
868                 $update = $stmt->command();
869                 @vals = $stmt->row_values();
870 #::logDebug("row_values returned=" . ::uneval(\@vals));
871         }
872
873
874         @na = @{$spec->{rf}}     if $spec->{rf};
875
876 #::logDebug("spec->{ml}=$spec->{ml} opt->{ml}=$opt->{ml}");
877         $spec->{ml} = $opt->{ml} if $opt->{ml};
878         $spec->{ml} ||= '1000';
879         $spec->{fn} = [$s->columns];
880
881         my $sub;
882
883         if($update eq 'INSERT') {
884                 if(! $spec->{rf} or  $spec->{rf}[0] eq '*') {
885                         @update_fields = @{$spec->{fn}};
886                 }
887                 else {
888                         @update_fields = @{$spec->{rf}};
889                 }
890 #::logDebug("update fields: " . uneval(\@update_fields));
891                 @na = $codename;
892                 $sub = $s->row_settor(@update_fields);
893         }
894         elsif($update eq 'UPDATE') {
895                 @update_fields = @{$spec->{rf}};
896 #::logDebug("update fields: " . uneval(\@update_fields));
897                 my $key = $s->config('KEY');
898                 @na = ($codename);
899                 $sub = sub {
900                                         my $key = shift;
901                                         $s->set_slice($key, [@update_fields], \@_);
902                                 };
903         }
904         elsif($update eq 'DELETE') {
905                 @na = $codename;
906                 $sub = sub { delete_record($s, @_) };
907         }
908         else {
909                 @na = @{$spec->{fn}}   if ! scalar(@na) || $na[0] eq '*';
910         }
911
912         $spec->{rf} = [@na];
913
914 #::logDebug("tabs='@tabs' columns='@na' vals='@vals' uf=@update_fields update=$update"); 
915
916     my $search;
917     if (! defined $opt->{st} or "\L$opt->{st}" eq 'db' ) {
918                 for(@tabs) {
919                         s/\..*//;
920                 }
921         $search = new Vend::DbSearch;
922 #::logDebug("created DbSearch object: " . ::uneval_it($search));
923         }
924         else {
925         $search = new Vend::TextSearch;
926 #::logDebug("created TextSearch object: " . ::uneval_it($search));
927     }
928
929         my %fh;
930         my $i = 0;
931         %nh = map { (lc $_, $i++) } @na;
932         $i = 0;
933         %fh = map { ($_, $i++) } @{$spec->{fn}};
934
935 #::logDebug("field hash: " . Vend::Util::uneval_it(\%fh)); 
936         for ( qw/rf sf/ ) {
937                 next unless defined $spec->{$_};
938                 map { $_ = $fh{$_} } @{$spec->{$_}};
939         }
940
941         if($update) {
942                 $opt->{row_count} = 1;
943                 die "Reached update query without object"
944                         if ! $s;
945 #::logDebug("Update operation is $update, sub=$sub");
946                 die "Bad row settor for columns @na"
947                         if ! $sub;
948                 if($update eq 'INSERT') {
949                         $sub->(@vals);
950                         $ref = [[ $vals[0] ]];
951                 }
952                 else {
953                         $ref = $search->array($spec);
954                         for(@$ref) {
955 #::logDebug("returned =" . uneval($_) . ", update values: " . uneval(\@vals));
956                                 $sub->($_->[0], @vals);
957                         }
958                 }
959         }
960         elsif ($opt->{hashref}) {
961                 $ref = $Vend::Interpolate::Tmp->{$opt->{hashref}} = $search->hash($spec);
962         }
963         else {
964 #::logDebug(    " \$Vend::Interpolate::Tmp->{$opt->{arrayref}}");
965                 $ref = $Vend::Interpolate::Tmp->{$opt->{arrayref} || ''}
966                          = $search->array($spec);
967                 $opt->{object} = $search;
968                 $opt->{prefix} = 'sql' unless defined $opt->{prefix};
969         }
970 };
971 #::logDebug("search spec: " . Vend::Util::uneval($spec));
972 #::logDebug("name hash: " . Vend::Util::uneval(\%nh));
973 #::logDebug("ref returned: " . substr(Vend::Util::uneval($ref), 0, 100));
974 #::logDebug("opt is: " . Vend::Util::uneval($opt));
975         if($@) {
976                 $s->log_error(
977                                 "MVSQL query failed for %s: %s\nquery was: %s",
978                                 $opt->{table},
979                                 $@,
980                                 $query,
981                         );
982                 $return = $opt->{failure} || undef;
983         }
984
985         if($opt->{search_label}) {
986                 $::Instance->{SearchObject}{$opt->{search_label}} = {
987                         mv_results => $ref,
988                         mv_field_names => \@na,
989                 };
990         }
991
992         if ($opt->{row_count}) {
993                 my $rc = $ref ? scalar @$ref : 0;
994                 return $rc unless $opt->{list};
995                 $ref = [ [ $rc ] ];
996                 @na = [ 'row_count' ];
997                 %nh = ( 'rc' => 0, 'count' => 0, 'row_count' => 0 );
998         }
999
1000         return Vend::Interpolate::tag_sql_list($text, $ref, \%nh, $opt, \@na)
1001                 if $opt->{list};
1002         return Vend::Interpolate::html_table($opt, $ref, \@na)
1003                 if $opt->{html};
1004         return Vend::Util::uneval($ref)
1005                 if $opt->{textref};
1006         return wantarray ? ($ref, \%nh, \@na) : $ref;
1007 }
1008
1009 *import_quoted = *import_csv = \&import_ascii_delimited;
1010
1011 my %Sort = (
1012
1013     ''  => sub { $a cmp $b              },
1014     none    => sub { $a cmp $b              },
1015     f   => sub { (lc $a) cmp (lc $b)    },
1016     fr  => sub { (lc $b) cmp (lc $a)    },
1017     n   => sub { $a <=> $b              },
1018     nr  => sub { $b <=> $a              },
1019     r   => sub { $b cmp $a              },
1020     rf  => sub { (lc $b) cmp (lc $a)    },
1021     rn  => sub { $b <=> $a              },
1022 );
1023
1024 my $fafh;
1025 sub file_access {
1026         my $function = shift;
1027         return <$fafh> 
1028 }
1029
1030 sub import_ascii_delimited {
1031     my ($infile, $options, $table_name) = @_;
1032         my ($format, $csv);
1033
1034         my $delimiter = quotemeta($options->{'delimiter'});
1035
1036         if  ($delimiter eq 'CSV') {
1037                 $csv = 1;
1038                 $format = 'CSV';
1039         }
1040         elsif ($options->{CONTINUE}) {
1041                 $format = uc $options->{CONTINUE};
1042         }
1043         else {
1044                 $format = 'NONE';
1045         }
1046
1047         my $realfile;
1048         if($options->{PRELOAD}) {
1049                 # do not preload if $infile is a scalar reference
1050                 if ($options->{scalar_ref} or 
1051                         (-f $infile and $options->{PRELOAD_EMPTY_ONLY})) {
1052                         # Do nothing, no preload
1053                 }
1054                 else {
1055                         $realfile = -f $infile ? $infile : '';
1056                         $infile = $options->{PRELOAD};
1057                         $infile = "$Global::VendRoot/$infile" if ! -f $infile;
1058                         ($infile = $realfile, undef $realfile) if ! -f $infile;
1059                 }
1060         }
1061
1062         if(! defined $realfile) {
1063                 if($options->{scalar_ref}){
1064                         open(IN, '+<', $infile)
1065                                 or die errmsg("%s %s: %s\n", errmsg("open scalar reference"), *$infile, $!);
1066                         # locking of scalar reference filehandles in unsupported
1067                 }
1068                 else{
1069                         open(IN, "+<$infile")
1070                                 or die errmsg("%s %s: %s\n", errmsg("open read/write"), $infile, $!);
1071                         lockfile(\*IN, 1, 1)
1072                                 or die errmsg("%s %s: %s\n", errmsg("lock"), $infile, $!);
1073                 }
1074
1075         }
1076         else {
1077                 open(IN, "<$infile")
1078                         or die errmsg("%s %s: %s\n", errmsg("open"), $infile, $!);
1079         }
1080
1081         new_filehandle(\*IN);
1082
1083         my $field_hash;
1084         my $para_sep;
1085         my $codere = '[\w-_#/.]+';
1086         my $idx = 0;
1087
1088         my($field_count, @field_names);
1089         
1090         if($options->{hs}) {
1091                 my $i = 0;
1092                 <IN> while $i++ < $options->{hs};
1093         }
1094         if($options->{field_names}) {
1095                 @field_names = @{$options->{field_names}};
1096
1097                 # This pulls COLUMN_DEF out of a field name
1098                 # remains in ASCII file, though
1099                 separate_definitions($options,\@field_names);
1100
1101                 if($options->{CONTINUE} eq 'NOTES') {
1102                         $para_sep = $options->{NOTES_SEPARATOR} ||$options->{SEPARATOR} || "\f";
1103                         $field_hash = {};
1104                         for(@field_names) {
1105                                 $field_hash->{$_} = $idx++;
1106                         }
1107                         $idx = $#field_names;
1108                 }
1109         }
1110         else {
1111                 my $field_names;
1112                 if ($csv) {
1113                         @field_names = read_quoted_fields(\*IN);
1114                 }
1115                 else {
1116                         $field_names = <IN>;
1117                         chomp $field_names;
1118                         $field_names =~ s/\s+$// unless $format eq 'NOTES';
1119                         @field_names = split(/$delimiter/, $field_names);
1120                 }
1121
1122                 # This pulls COLUMN_DEF out of a field name
1123                 # remains in ASCII file, though
1124                 separate_definitions($options,\@field_names);
1125
1126 #::logDebug("field names: @field_names");
1127                 if($format eq 'NOTES') {
1128                         $field_hash = {};
1129                         for(@field_names) {
1130                                 s/:.*//;        
1131                                 if(/\S[ \t]+/) {
1132                                         die "Only one notes field allowed in NOTES format.\n"
1133                                                 if $para_sep;
1134                                         $para_sep = $_;
1135                                         $_ = '';
1136                                 }
1137                                 else {
1138                                         $field_hash->{$_} = $idx++;
1139                                 }
1140                         }
1141                         my $msg;
1142                         @field_names = grep $_, @field_names;
1143                         $para_sep =~ s/($codere)[\t ]*(.)/$2/;
1144                         push(@field_names, ($1 || 'notes_field'));
1145                         $idx = $#field_names;
1146                         $para_sep = $options->{NOTES_SEPARATOR} || "\f";
1147                 }
1148         }
1149
1150         local($/) = "\n" . $para_sep ."\n"
1151                 if $para_sep;
1152
1153         $field_count = scalar @field_names;
1154
1155         no strict 'refs';
1156     my $out;
1157         if($options->{ObjectType}) {
1158                 $out = &{"$options->{ObjectType}::create"}(
1159                                                                         $options->{ObjectType},
1160                                                                         $options,
1161                                                                         \@field_names,
1162                                                                         $table_name,
1163                                                                 );
1164         }
1165         else {
1166                 $out = $options->{Object};
1167         }
1168
1169         if(! $out) {
1170                 die errmsg(q{No database object for table: %s
1171
1172 Probable mismatch of Database directive to database type,
1173 for example calling DBI without proper modules or database
1174 access.
1175 },
1176                                         $table_name,
1177                                         );
1178         }
1179         my $fields;
1180     my (@fields, $key);
1181         my @addl;
1182         my $excel = '';
1183         my $excel_addl = '';
1184
1185         if($options->{EXCEL}) {
1186         #Fix for quoted includes supplied by Larry Lesczynski
1187                 $excel = <<'EndOfExcel';
1188                         if(/"[^\t]*(?:,|"")/) {
1189                                 for (@fields) {
1190                                         next unless /[,"]/;
1191                                         s/^"//;
1192                                         s/"$//;
1193                                         s/""/"/g;
1194                                 }
1195                         }
1196 EndOfExcel
1197                 $excel_addl = <<'EndOfExcel';
1198                         if(/"[^\t]*(?:,|"")/) {
1199                                 for (@addl) {
1200                                         next unless /,/;
1201                                         s/^"//;
1202                                         s/"$//;
1203                                 }
1204                         }
1205 EndOfExcel
1206         }
1207         
1208         my $index = '';
1209         my @fh; # Array of file handles for sort
1210         my @fc; # Array of file handles for copy when symlink fails
1211         my @i;  # Array of field names for sort
1212         my @o;  # Array of sort options
1213         my %comma;
1214         if($options->{INDEX} and ! $options->{NO_ASCII_INDEX}) {
1215                 my @f; my $f;
1216                 my @n;
1217                 my $i;
1218                 @f = @{$options->{INDEX}};
1219                 foreach $f (@f) {
1220                         my $found = 0;
1221                         $i = 0;
1222                         if( $f =~ s/:(.*)//) {
1223                                 my $option = $1;
1224                                 push @o, $1;
1225                         }
1226                         elsif (exists $options->{INDEX_OPTIONS}{$f}) {
1227
1228                                 push @o, $options->{INDEX_OPTIONS}{$f};
1229                         }
1230                         else {
1231                                 push @o, '';
1232                         }
1233                         for(@field_names) {
1234                                 if($_ eq $f) {
1235                                         $found++;
1236                                         push(@i, $i);
1237                                         push(@n, $f);
1238                                         last;
1239                                 }
1240                                 $i++;
1241                         }
1242                         (pop(@o), next) unless $found;
1243                 }
1244                 if(@i) {
1245                         require IO::File;
1246                         my $fh;
1247                         my $f_string = join ",", @i;
1248                         @f = ();
1249                         for($i = 0; $i < @i; $i++) {
1250                                 my $fnum = $i[$i];
1251                                 $fh = new IO::File "> $infile.$i[$i]";
1252                                 die errmsg("%s %s: %s\n", errmsg("create"), "$infile.$i[$i]",
1253                                 $!) unless defined $fh;
1254
1255                                 new_filehandle($fh);
1256
1257                                 eval {
1258                                         unlink "$infile.$n[$i]" if -l "$infile.$n[$i]";
1259                                         symlink "$infile.$i[$i]", "$infile.$n[$i]";
1260                                 };
1261                                 push @fc, ["$infile.$i[$i]", "$infile.$n[$i]"]
1262                                         if $@;
1263                                 push @fh, $fh;
1264                                 if($o[$i] =~ s/c//) {
1265                                         $index .= <<EndOfIndex;
1266                         map { print { \$fh[$i] } "\$_\\t\$fields[0]\\n" } split /\\s*,\\s*/, \$fields[$fnum];
1267 EndOfIndex
1268                                 }
1269                                 elsif($o[$i] =~ s/s//) {
1270                                         $index .= <<EndOfIndex;
1271                         map { print { \$fh[$i] } "\$_\\t\$fields[0]\\n" } split /\\s*;\\s*/, \$fields[$fnum];
1272 EndOfIndex
1273                                 }
1274                                 else {
1275                                         $index .= <<EndOfIndex;
1276                         print { \$fh[$i] } "\$fields[$fnum]\\t\$fields[0]\\n";
1277 EndOfIndex
1278                                 }
1279                         }
1280                 }
1281         }
1282
1283         my $numeric_guess = '';
1284         my $numeric_clean = '';
1285         my %non_numeric;
1286         my @empty;
1287         my @possible;
1288         my $clean;
1289
1290         if($options->{GUESS_NUMERIC} and $options->{type} ne '8') {
1291                 @possible = (0 .. $#field_names);
1292                 @empty = map { 1 } (0 .. $#field_names);
1293                 
1294                 $numeric_guess = <<'EOF';
1295                         for (@possible) {
1296                                 ($empty[$_] = 0, next) if $fields[$_] =~ /^-?\d+\.?\d*$/;
1297                                 next if $empty[$_] && ! length($fields[$_]);
1298                                 $empty[$_] = undef;
1299                                 $clean = 1;
1300                                 $non_numeric{$_} = 1;
1301                         }
1302 EOF
1303                 $numeric_clean = <<'EOF';
1304                         next unless $clean;
1305                         undef $clean;
1306                         @possible = grep ! $non_numeric{$_}, @possible;
1307                         %non_numeric = ();
1308 EOF
1309         }
1310
1311 my %format = (
1312
1313         NOTES => <<EndOfRoutine,
1314         while (<IN>) {
1315             chomp;
1316                         \@fields = ();
1317                         s/\\r?\\n\\r?\\n((?s:.)*)//
1318                                 and \$fields[$idx] = \$1;
1319
1320                         while(s!($codere):[ \\t]*(.*)\\n?!!) {
1321                                 next unless defined \$field_hash->{\$1};
1322                                 \$fields[\$field_hash->{\$1}] = \$2;
1323                         }
1324                         $index
1325                         $numeric_guess
1326             \$out->set_row(\@fields);
1327                         $numeric_clean
1328         }
1329 EndOfRoutine
1330
1331         LINE => <<EndOfRoutine,
1332         while (<IN>) {
1333             chomp;
1334                         \$fields = \@fields = split(/$delimiter/, \$_, $field_count);
1335                         $index
1336                         push (\@fields, '') until \$fields++ >= $field_count;
1337                         $numeric_guess
1338             \$out->set_row(\@fields);
1339                         $numeric_clean
1340         }
1341 EndOfRoutine
1342
1343         CSV => <<EndOfRoutine,
1344                 while (\@fields = read_quoted_fields(\\*IN)) {
1345             \$fields = scalar \@fields;
1346                         $index
1347             push (\@fields, '') until \$fields++ >= $field_count;
1348                         $numeric_guess
1349             \$out->set_row(\@fields);
1350                         $numeric_clean
1351         }
1352 EndOfRoutine
1353
1354         NONE => <<EndOfRoutine,
1355         while (<IN>) {
1356             chomp;
1357             \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1358                         $excel
1359                         $index
1360             push (\@fields, '') until \$fields++ >= $field_count;
1361                         $numeric_guess
1362             \$out->set_row(\@fields);
1363                         $numeric_clean
1364         }
1365 EndOfRoutine
1366
1367         UNIX => <<EndOfRoutine,
1368         while (<IN>) {
1369             chomp;
1370                         if(s/\\\\\$//) {
1371                                 \$_ .= <IN>;
1372                                 redo;
1373                         }
1374                         elsif (s/<<(\\w+)\$//) {
1375                                 my \$mark = \$1;
1376                                 my \$line = \$_;
1377                                 \$line .= Vend::Config::read_here(\\*IN, \$mark);
1378                                 \$_ = \$line;
1379                                 redo;
1380                         }
1381
1382             \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1383                         $excel
1384                         $index
1385             push (\@fields, '') until \$fields++ >= $field_count;
1386                         $numeric_guess
1387             \$out->set_row(\@fields);
1388                         $numeric_clean
1389         }
1390 EndOfRoutine
1391
1392         DITTO => <<EndOfRoutine,
1393         while (<IN>) {
1394             chomp;
1395                         if(/^$delimiter/) {
1396                                 \$fields = \@addl = split /$delimiter/, \$_, 99999;
1397                                 shift \@addl;
1398                                 $excel_addl
1399                                 my \$i;
1400                                 for(\$i = 0; \$i < \@addl; \$i++) {
1401                                         \$fields[\$i] .= "\n\$addl[\$i]"
1402                                                 if \$addl[\$i] ne '';
1403                                 }
1404                         }
1405                         else {
1406                                 \$fields = \@fields = split(/$delimiter/, \$_, 99999);
1407                                 $excel
1408                                 $index
1409                                 push (\@fields, '') until \$fields++ >= $field_count;
1410                         }
1411                         $numeric_guess
1412             \$out->set_row(\@fields);
1413                         $numeric_clean
1414         }
1415 EndOfRoutine
1416
1417 );
1418
1419     eval $format{$format};
1420         die errmsg("%s import into %s failed: %s", $options->{name}, $options->{table}, $@) if $@;
1421     if($realfile) {
1422                 close IN
1423                         or die errmsg("%s %s: %s\n", errmsg("close"), $infile, $!);
1424                 if(-f $realfile) {
1425                         open(IN, "+<$realfile")
1426                                 or die
1427                                         errmsg("%s %s: %s\n", errmsg("open read/write"), $realfile, $!);
1428                         lockfile(\*IN, 1, 1)
1429                                 or die errmsg("%s %s: %s\n", errmsg("lock"), $realfile, $!);
1430                         new_filehandle(\*IN);
1431                         <IN>;
1432                         eval $format{$format};
1433                         die errmsg("%s %s: %s\n", errmsg("import"), $options->{name}, $!) if $@;
1434                 }
1435                 elsif (! open(IN, ">$realfile") && new_filehandle(\*IN) ) {
1436                                 die errmsg("%s %s: %s\n", errmsg("create"), $realfile, $!);
1437                 } 
1438                 else {
1439                         print IN join($options->{DELIMITER}, @field_names);
1440                         print IN $/;
1441                         close IN;
1442                 }
1443         }
1444         if(@fh) {
1445                 my $no_sort;
1446                 my $sort_sub;
1447                 my $ftest = Vend::Util::catfile($Vend::Cfg->{ScratchDir}, 'sort.test');
1448                 my $cmd = "echo you_have_no_sort_but_we_will_cope | sort -f -n -o $ftest";
1449                 system $cmd;
1450                 $no_sort = 1 if ! -f $ftest;
1451                 
1452                 my $fh;
1453                 my $i;
1454                 for ($i = 0; $i < @fh; $i++) {
1455                         close $fh[$i] or die "close: $!";
1456                         unless ($no_sort) {
1457                                 $o[$i] = "-$o[$i]" if $o[$i];
1458                                 $cmd = "sort $o[$i] -o $infile.$i[$i] $infile.$i[$i]";
1459                                 system $cmd;
1460                         }
1461                         else {
1462                                 $fh = new IO::File "$infile.$i[$i]";
1463                                 new_filehandle($fh);
1464                                 my (@lines) = <$fh>;
1465                                 close $fh or die "close: $!";
1466                                 my $option = $o[$i] || 'none';
1467                                 @lines = sort { &{$Sort{$option}} } @lines;
1468                                 $fh = new IO::File ">$infile.$i[$i]";
1469                                 new_filehandle($fh);
1470                                 print $fh @lines;
1471                                 close $fh or die "close: $!";
1472                         }
1473                 }
1474         }
1475         if(@fc) {
1476                 require File::Copy;
1477                 for(@fc) {
1478                         File::Copy::copy(@{$_});
1479                 }
1480         }
1481
1482         unless($options->{no_commit}) {
1483                 $out->commit() if $out->config('HAS_TRANSACTIONS');
1484         }
1485         delete $out->[$CONFIG]{Clean_start};
1486         delete $out->[$CONFIG]{_Dirty};
1487         unless($options->{scalar_ref}){
1488                 unlockfile(\*IN) or die "unlock\n";
1489         }
1490     close(IN);
1491         my $dot = $out->[$CONFIG]{HIDE_AUTO_FILES} ? '.' : '';
1492         if($numeric_guess) {
1493                 my $fn = Vend::Util::catfile($out->[$CONFIG]{DIR}, "$dot$out->[$CONFIG]{file}");
1494                 Vend::Util::writefile(
1495                                         ">$fn.numeric",
1496                                         join " ", map { $field_names[$_] } @possible,
1497                 );
1498         }
1499     return $out;
1500 }
1501
1502 sub import_from_ic_db {
1503     my ($infile, $options, $table_name) = @_;
1504
1505         my $tname = $options->{MIRROR}
1506                 or die errmsg(
1507                                 "Memory mirror table not specified for table %s.",
1508                                 $table_name,
1509                         );
1510 #::logDebug("Importing mirrored $table_name from $tname");
1511
1512         $Vend::Database{$tname} =
1513                 Vend::Data::import_database($Vend::Cfg->{Database}{$tname})
1514                         unless $Vend::Database{$tname};
1515
1516         my $idb = Vend::Data::database_exists_ref($tname)
1517                 or die errmsg(
1518                                 "Memory mirror table %s does not exist (yet) to create mirror %s.\n",
1519                                 $tname,
1520                                 $table_name,
1521                         );
1522
1523         my @field_names = $idb->columns;
1524
1525         my $odb;
1526
1527         if($options->{ObjectType}) {
1528                 no strict 'refs';
1529                 $odb = &{"$options->{ObjectType}::create"}(
1530                                                                         $options->{ObjectType},
1531                                                                         $options,
1532                                                                         \@field_names,
1533                                                                         $table_name,
1534                                                                 );
1535         }
1536         else {
1537                 $odb = $options->{Object};
1538         }
1539
1540 #::logDebug("idb=$idb odb=$odb");
1541         eval {
1542                 my $f;
1543                 while($f = $idb->each_nokey($options->{MIRROR_QUAL})) {
1544 #::logDebug("importing key=$f->[0]");
1545                         $odb->set_row(@$f);
1546                 }
1547         };
1548
1549         if($@) {
1550                 die errmsg(
1551                                 "Problem with mirror import from source %s to target %s\n",
1552                                 $tname,
1553                                 $table_name,
1554                                 );
1555         }
1556         
1557         $odb->[$CONFIG]{Mirror_complete} = 1;
1558         delete $odb->[$CONFIG]{Clean_start};
1559     return $odb;
1560 }
1561
1562 my $white = ' \t';
1563
1564 sub read_quoted_fields {
1565     my ($filehandle) = @_;
1566     local ($_, $.);
1567     while(<$filehandle>) {
1568         s/[\r\n\cZ]+$//g;           # ms-dos cruft
1569         next if m/^[$white]*$/o;     # skip blank lines
1570         my @f = parse($_, $.);
1571 #::logDebug("read: '" . join("','", @f) . "'");
1572         return parse($_, $.);
1573     }
1574     return ();
1575 }
1576
1577 sub parse {
1578     local $_ = $_[0];
1579     my $linenum = $_[1];
1580
1581     my $expect = 1;
1582     my @a = ();
1583     my $x;
1584     while ($_ ne '') {
1585         if    (m# \A ([$white]+) (.*) #ox) { }
1586         elsif (m# \A (,[$white]*) (.*) #ox) {
1587             push @a, '' if $expect;
1588             $expect = 1;
1589         }
1590         elsif (m# \A ([^",$white] (?:[$white]* [^,$white]+)*) (.*) #ox) {
1591             push @a, $1;
1592             $expect = 0;
1593         }
1594         elsif (m# \A " ((?:[^"] | (?:""))*) " (?!") (.*) #x) {
1595             ($x = $1) =~ s/""/"/g;
1596             push @a, $x;
1597             $expect = 0;
1598         }
1599         elsif (m# \A " #x) {
1600             die "Unterminated quote at line $linenum\n";
1601         }
1602         else { die "Can't happen: '$_'" }
1603         $_ = $2;
1604     }
1605     $expect and push @a, '';
1606     return @a;
1607 }
1608
1609 sub reset {
1610         undef $restrict;
1611 }
1612
1613 sub errstr {
1614         return shift(@_)->[$CONFIG]{last_error};
1615 }
1616
1617 sub log_error {
1618         my ($s, $tpl, @args) = @_;
1619         if($tpl =~ /^(prepare|execute)$/) {
1620                 if(!@args) {
1621                         $tpl = "Statement $tpl failed: %s";
1622                 }
1623                 elsif (@args == 1) {
1624                         $tpl = "Statement $tpl failed: %s\nQuery was: %s";
1625                 }
1626                 else {
1627                         $tpl = "Statement $tpl failed: %s\nQuery was: %s";
1628                         $tpl .= "\nAdditional: %s" for (2 .. scalar(@args));
1629                 }
1630                 unshift @args, $DBI::errstr;
1631         }
1632         my $msg = errmsg($tpl, @args);
1633         my $ekey = 'table ' . $s->[$CONFIG]{name};
1634         my $cfg = $s->[$CONFIG];
1635         unless(defined $cfg->{LOG_ERROR_CATALOG} and ! $cfg->{LOG_ERROR_CATALOG}) {
1636                 logError($msg);
1637         }
1638         if($cfg->{LOG_ERROR_GLOBAL}) {
1639                 logGlobal($msg);
1640         }
1641         if($Vend::admin or ! defined($cfg->{LOG_ERROR_SESSION}) or $cfg->{LOG_ERROR_SESSION}) {
1642                 $Vend::Session->{errors} = {} unless CORE::ref($Vend::Session->{errors}) eq 'HASH';
1643                 $Vend::Session->{errors}{$ekey} = $msg;
1644         }
1645         die $msg if $cfg->{DIE_ERROR};
1646         return $cfg->{last_error} = $msg;
1647 }
1648
1649 sub new_filehandle {
1650         my $fh = shift;
1651         binmode($fh, ":utf8") if $::Variable->{MV_UTF8} || $Global::Variable->{MV_UTF8};
1652         return $fh;
1653 }
1654
1655 1;
1656
1657 __END__