apop_db.c 28.2 KB
Newer Older
1 2 3

/** \file apop_db.c	An easy front end to SQLite. Includes a few nice
features like a variance, skew, and kurtosis aggregator for SQL. */
4
/* Copyright (c) 2006--2009 by Ben Klemens.  Licensed under the GPLv2; see COPYING.  */
5 6 7 8

#include "apop_internal.h"

/** Here are where the options are initially set. See the \ref apop_opts_type
9 10 11
    documentation for details.
\ingroup all_public
*/
12 13 14 15 16
apop_opts_type apop_opts	= 
          { .verbose=1,
            .output_delimiter ="\t",       .input_delimiters = "|,\t", 
            .db_name_column = "row_names", .nan_string = "NaN", 
            .db_engine = '\0',             .db_user = "\0", 
17
            .db_pass = "\0",               .stop_on_warning = 'n',
18
            .log_file = NULL,
19
            .rng_seed = 479901,            .version = 1.0 };
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

#define ERRCHECK {Apop_stopif(err, return 1, 0, "%s: %s",query, err); }
#define ERRCHECK_NR {Apop_stopif(err, return NULL, 0, "%s: %s",query, err); }
#define ERRCHECK_SET_ERROR(outdata) {Apop_stopif(err, if (!(outdata)) (outdata)=apop_data_alloc(); (outdata)->error='q'; sqlite3_free(err); return outdata, 0, "%s: %s",query, err); }

#include "apop_db_sqlite.c" // callback_t is defined here, btw.


#ifdef HAVE_MYSQL
//Let mysql have these.
#undef VERSION
#undef PACKAGE
#undef PACKAGE_NAME
#undef PACKAGE_STRING
#undef PACKAGE_TARNAME
#undef PACKAGE_VERSION
#undef PACKAGE_BUGREPORT
#include "apop_db_mysql.c"
#endif

//if !apop_opts.db_engine, run this to assign a value.
static void get_db_type(){
    if (getenv("APOP_DB_ENGINE") && (!strcasecmp(getenv("APOP_DB_ENGINE"), "mysql") || !strcasecmp(getenv("APOP_DB_ENGINE"), "mariadb")))
        apop_opts.db_engine = 'm';
    else
        apop_opts.db_engine = 's';
}

//This macro declares the query string and fills it from the printf part of the call.
#define Fillin(query, fmt)        \
    char *query;                  \
    va_list argp;                 \
	va_start(argp, fmt);          \
	Apop_stopif(vasprintf(&query, fmt, argp)==-1, , 0, "Trouble writing to a string."); \
	va_end(argp);                 \
	Apop_notify(2, "%s", query);

/** If you want to use a database on the hard drive instead of memory, then call this
once and only once before using any other database utilities.

60 61
With SQLite, if you want a disposable database which you won't use after the program
ends, don't bother with this function.
62

63 64
The trade-offs between an on-disk database and an in-memory db are as one would expect:
memory is faster, but the database is destroyed when the program exits.
65 66 67

MySQL users: either set the environment variable APOP_DB_ENGINE=mysql or set \c apop_opts.db_engine = 'm'.

68 69 70 71
The Apophenia package assumes you are only using a single database at a time. You
can use the SQL <tt>attach</tt> function to load other databases, or see <a
href="http://modelingwithdata.org/arch/00000142.htm">this blog post</a> for further
suggestions and sample code.
72

73
When you are done doing your database manipulations, call \ref apop_db_close if writing to disk.
74 75 76 77 78 79 80

\param filename
The name of a file on the hard drive on which to store the database. If
<tt>NULL</tt>, then the database will be kept in memory (in which case,
the other database functions will call this function for you and you
don't need to bother).

81 82
\li See \ref sqlsec for mroe notes on using databases.

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
\return 0: everything OK<br>
        1: database did not open.
*/
int apop_db_open(char const *filename){
    if (!apop_opts.db_engine) get_db_type();
    if (!db) //check the environment.
#ifdef HAVE_MYSQL
       if(!mysql_db)  
#endif

    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        return apop_mysql_db_open(filename);
#else
        {Apop_stopif(1, return -1, 0, "Apophenia was compiled without mysql support.");}
#endif
        return apop_sqlite_db_open(filename);
}

102
/** \cond doxy_ignore */
103 104 105 106
typedef struct {
    char const *name;
    int isthere;
} tab_exists_t;
107
/** \endcond */
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

static int tab_exists_callback(void *in, int argc, char **argv, char **whatever){
    tab_exists_t *te = in;
	if (!strcmp(argv[argc-1], te->name))
		te->isthere=1;
	return 0;
}

/** Check for the existence of a table, and maybe delete it.

Recreating a table which already exists can cause errors, so it is good practice to check for existence first.  Also, this is the stylish way to delete a table, since just calling <tt>"drop table"</tt> will give you an error if the table doesn't exist.

\param name 	the table name (no default)
\param remove 'd'	==>delete table so it can be recreated in main.<br>
		'n'	==>no action. Return result so program can continue. (default)
\return
0 = table does not exist<br>
1 = table was found, and if remove=='d', has been deleted
126
-1 = processing error
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

\li In the SQLite engine, this function considers table views to be tables.
\li This function uses the \ref designated syntax for inputs.
*/
#ifdef APOP_NO_VARIADIC
int apop_table_exists(char const *name, char remove){
#else
apop_varad_head(int, apop_table_exists){
    char const *apop_varad_var(name, NULL)
    Apop_stopif(!name, return -1, 0, "You gave me a NULL table name.");
    char apop_varad_var(remove, 'n')
    return apop_table_exists_base(name, remove);
}

 int apop_table_exists_base(char const *name, char remove){
#endif
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        return apop_mysql_table_exists(name, remove);
#else
        Apop_stopif(1, return -1, 0, "Apophenia was compiled without mysql support.");
#endif
    char *err=NULL, *q2;
    tab_exists_t te = { .name = name };
    tab_exists_t tev = { .name = name };
	if (db==NULL) return 0;
	sqlite3_exec(db, "select name from sqlite_master where type='table'", tab_exists_callback, &te, &err); 
	sqlite3_exec(db, "select name from sqlite_master where type='view'", tab_exists_callback, &tev, &err); 
    char query[]="Selecting names from sqlite_master";//for ERRCHECK.
	ERRCHECK
	if ((remove==1|| remove=='d') && (te.isthere||tev.isthere)){
        if (te.isthere)
            Asprintf(&q2, "drop table %s;", name);
        else
            Asprintf(&q2, "drop view %s;", name);
		sqlite3_exec(db, q2, NULL, NULL, &err); 
        free(q2);
        ERRCHECK
    }
	return (te.isthere||tev.isthere);
}

/**
Closes the database on disk. If you opened the database with \c apop_db_open(NULL), then this is basically optional.

\param vacuum 
'v': vacuum---do clean-up to minimize the size of the database on disk.<br>
'q': Don't bother; just close the database. (default = 'q')

\return 0 on OK, nonzero on error.
\li This function uses the \ref designated syntax for inputs.
*/
#ifdef APOP_NO_VARIADIC
int apop_db_close(char vacuum){
#else
apop_varad_head(int, apop_db_close){
    char apop_varad_var(vacuum, 'q')
    return apop_db_close_base(vacuum);
}

 int apop_db_close_base(char vacuum){
#endif
    if (apop_opts.db_engine == 'm') //assume this is set by now...
#ifdef HAVE_MYSQL
        {apop_mysql_db_close(0);
        return 0;}
#else
        {Apop_stopif(1, return -1, 0, "Apophenia was compiled without mysql support.");}
#endif
    else {
        char *err, *query = "db close";//for errcheck.
        if (vacuum==1 || vacuum=='v') {
            sqlite3_exec(db, "VACUUM", NULL, NULL, &err);
            ERRCHECK
        }
        sqlite3_close(db);
    	//ERRCHECK
        db  = NULL;
    }
    return 0;
}

/** Send a query to the database that returns no data.

212 213 214
\li As with functions like the \c apop_query_to_data, the query can include
printf-style format specifiers, such as <tt>apop_query("create table %s(id, name,
age);", tablename)</tt>.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244

\param fmt A <tt>printf</tt>-style SQL query.
\return 0 on success, 1 on failure.
*/
int apop_query(const char *fmt, ...){
    char *err=NULL;
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        {Apop_stopif(!mysql_db, return 1, 0, "No mySQL database is open.");
        return apop_mysql_query(query);}
#else
        Apop_stopif(1, return 1, 0, "Apophenia was compiled without mysql support.");
#endif
    else 
        {if (!db) apop_db_open(NULL);
        sqlite3_exec(db, query, NULL,NULL, &err);
	    ERRCHECK
        }
	free(query);
	return 0;
}

/** Dump the results of a query into an array of strings.

\return	 An \ref apop_data structure with the <tt>text</tt> element filled.

\param fmt A <tt>printf</tt>-style SQL query.

245 246
\exception out->error=='q' The database engine was unable to run the query (e.g.,  invalid SQL syntax). Again, a valid query that returns zero rows is not an error, and \c NULL is returned.
\exception out->error=='d' Database error.
247

248 249 250 251 252 253 254 255 256
\li If <tt>apop_opts.db_name_column</tt> matches a column of the output table, then that
    column is used for row names, and therefore will not be included in the <tt>text</tt>.
\li <tt>query_output->text</tt> is always a 2-D array of strings, even if the query
    returns a single column. In that case, use <tt>returned_tab->text[i][0]</tt> (or
    equivalently, <tt>*returned_tab->text[i]</tt>) to refer to row <tt>i</tt>.
\li If an element in the database is \c NULL, the corresponding cell in the output
    table will be filled with the text given by \c apop_opts.nan_string. The default
    is \c "NaN", but you can set <tt>apop_opts.nan_string = "whatever you like"</tt>
    to change the text to whatever you like.
257
\li Returns \c NULL if your query is valid but returns zero rows.
258 259
\li The query can include printf-style format specifiers, such as
    <tt>apop_query_to_text("select name from %s where id=%i;", tablename, id_number)</tt>.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288

For example, the following function will list the tables in an SQLite database (much like you
could do from the command line using <tt>sqlite3 dbname.db ".table"</tt>).

\include ls_tables.c
*/
apop_data * apop_query_to_text(const char * fmt, ...){
    apop_data *out = NULL;
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm'){
#ifdef HAVE_MYSQL
        out = apop_mysql_query_core(query, process_result_set_chars);
#else
        Apop_stopif(1, apop_return_data_error('d'), 0, "Apophenia was compiled without mysql support.");
#endif
    } else out = apop_sqlite_query_to_text(query);
    free(query);
    return out;
}

//apop_query_to_data callback.
static int db_to_table(void *qinfo, int argc, char **argv, char **column){
    Apop_stopif(!argv, return -1, apop_errorlevel, "Got NULL data from SQLite.");
    int i, ncfound = 0;
    callback_t *qi= qinfo;
    if (qi->firstcall){
        qi->firstcall--;
        for(i=0; i<argc; i++)
289
            if (apop_opts.db_name_column && !strcasecmp(column[i], apop_opts.db_name_column)){
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
                qi->namecol = i;
                ncfound = 1;
                break;
            }
	    qi->outdata = argc-ncfound ? apop_data_alloc(1, argc-ncfound) : apop_data_alloc( );
        for(i=0; i<argc; i++)
            if (qi->namecol != i)
                apop_name_add(qi->outdata->names, column[i], 'c');
    } else 
        if (qi->outdata->matrix)
            apop_matrix_realloc(qi->outdata->matrix, qi->currentrow+1, qi->outdata->matrix->size2);
    ncfound =0;
    for (int jj=0;jj<argc;jj++)
        if (jj != qi->namecol){
            double valor = 
                !argv[jj] || !strcmp(argv[jj], "NULL")|| 
                (apop_opts.nan_string && !strcasecmp(apop_opts.nan_string, argv[jj]))
                 ? GSL_NAN : atof(argv[jj]);
            gsl_matrix_set(qi->outdata->matrix,qi->currentrow,jj-ncfound, valor);
        } else {
            apop_name_add(qi->outdata->names, argv[jj], 'r');
            ncfound = 1;
        }
    (qi->currentrow)++;
	return 0;
}

317
/** Queries the database and dumps the result into an \ref apop_data set.
318

319
\param fmt A <tt>printf</tt>-style SQL query.
320

321 322 323 324 325
\return If no rows are returned, \c NULL; else an \ref apop_data set with the data
in place. Most data will be in the \c matrix element of the output. Column names are
appropriately placed. If \ref apop_opts_type "apop_opts.db_name_column" matches one
of the fields in your query's output (default: \c row_names), then that column will
be used for row names (and therefore will not appear in the \c matrix).
326 327

\exception out->error=='q' Query error. A valid query that returns no rows is not an error; in that case, you get \c NULL.
328 329 330 331 332 333

\li The query can include printf-style
    format specifiers, such as <tt>apop_query_to_data("select age from %s where id=%i;",
    tablename, id_number)</tt>.
\li Blanks in the database (i.e., <tt> NULL</tt>s) and elements that match \ref
    apop_opts_type "apop_opts.nan_string" are filled with <tt>NAN</tt>s in the matrix.
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
*/ 
apop_data * apop_query_to_data(const char * fmt, ...){
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        return apop_mysql_query_core(query, process_result_set_data);
#else
        Apop_stopif(1, apop_return_data_error('d'), 0, "Apophenia was compiled without mysql support.");
#endif

    //else
    char *err=NULL;
    callback_t qinfo = {.firstcall = 1, .namecol=-1};
	if (db==NULL) apop_db_open(NULL);
    sqlite3_exec(db, query,db_to_table,&qinfo, &err); 
    free (query);
    ERRCHECK_SET_ERROR(qinfo.outdata)
	return qinfo.outdata;
}


    /** \cond doxy_ignore */
//These used to do more, but I'll leave them as a macro anyway in case of future expansion.
#define Store_settings  \
    int v = apop_opts.verbose; apop_opts.verbose=0;/*hack to prevent double-printing.*/ \

#define Restore_settings  \
    apop_opts.verbose=v;
    /** \endcond */

365
/** Queries the database and dumps the first column of the result into a \c gsl_vector.
366 367 368

\param fmt A <tt>printf</tt>-style SQL query.
\return	 A <tt>gsl_vector</tt> holding the first column of the returned matrix. Thus, if your query returns multiple lines, you will get no warning, and the function will return the first in the list.
369
\exception out->error=='q' Query error. A valid query that returns no rows is not an error; in that case, you get \c NULL.
370

371 372 373 374 375 376 377 378
\li Uses \ref apop_query_to_data internally, then throws away all but the first column
    of the matrix.
\li If \c apop_opts.db_name_column is set, then I'll ignore that column. It gets put
    into the names of the \ref apop_data set, and then thrown away when I look at only
    the \c gsl_matrix part of that set.
\li If the query returns zero rows of data or no columns, the function returns \c NULL.
\li The query can include printf-style format specifiers, such as <tt>apop_query_to_vector("select age from %s where id=%i;", tablename, id_number)</tt>.
*/
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
gsl_vector * apop_query_to_vector(const char * fmt, ...){
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        return apop_mysql_query_core(query, process_result_set_vector);
#else
        Apop_stopif(1, return NULL, 0, "Apophenia was compiled without mysql support.");
#endif
    apop_data *d=NULL;
    gsl_vector *out;
	if (db==NULL) apop_db_open(NULL);
    Store_settings
	d	= apop_query_to_data("%s", query);
    Restore_settings
    Apop_stopif(!d, return NULL, 2, "Query [%s] turned up a blank table. Returning NULL.", query);
    //else:
    out = gsl_vector_alloc(d->matrix->size1);
	gsl_matrix_get_col(out, d->matrix, 0);
	apop_data_free(d);
    free(query);
	return out;
}

/** Queries the database, and dumps the result into a single double-precision floating point number.

\li This calls \ref apop_query_to_data and returns the (0,0)th element of the returned matrix. Thus, if your query returns multiple lines, you will get no warning, and the function will return the first in the list (which is not always well-defined; maybe use an <tt>order by</tt> clause in your query if you expect multiple lines).

407 408 409 410 411 412 413 414 415 416
\li If \c apop_opts.db_name_column is set, then I'll ignore that column. It gets put
    into the names of the \ref apop_data set, and then thrown away when I look at only
    the \c gsl_matrix element of that set.
\li If the query produces a blank table, returns \c NAN, and if
    <tt>apop_opts.verbose>=2</tt>, prints an error.
\li The query can include printf-style format specifiers, such as
    <tt>apop_query_to_float("select age from %s where id=%i;", tablename, id_number)</tt>.
\li If the query produces an error, returns \c NAN, and if <tt>apop_opts.verbose>=0</tt>,
    prints an error. If you need to distinguish between blank tables, NaNs in the data,
    and query errors, use \ref apop_query_to_data.
417 418

\param fmt A <tt>printf</tt>-style SQL query.
419 420
\return		A \c double, actually.
*/
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
double apop_query_to_float(const char * fmt, ...){
    double out;
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm'){
#ifdef HAVE_MYSQL
        out = apop_mysql_query_to_float(query);
#else
        Apop_stopif(1, return NAN, 0, "Apophenia was compiled without mysql support.");
#endif
    } else {
        apop_data *d=NULL;
        if (db==NULL) apop_db_open(NULL);
        Store_settings
        d = apop_query_to_data("%s", query);
        Restore_settings
        Apop_stopif(!d, return GSL_NAN, 2, "Query [%s] turned up a blank table. Returning NaN.", query);
        Apop_stopif(d->error, return GSL_NAN, 0, "Query [%s] failed. Returning NaN.", query);
        out	= apop_data_get(d);
        apop_data_free(d);
    }
    free(query);
	return out;
}

/** Query data to an \c apop_data set, but a mix of names, vectors, matrix elements, and text.

If you are querying to a matrix and maybe a name, use \c
449
apop_query_to_data (and set \ref apop_opts_type "apop_opts.db_name_column" if desired). If querying only text, use \ref apop_query_to_text. But
450 451
if your data is a mix of text and numbers, use this.

452
The first argument is a character string consisting of the letters \c nvmtw, one for each column of the SQL output, indicating whether the column is a name, vector, matrix column, text column, or weight vector. You can have only one \c n, one \c v, and one \c w. 
453 454 455 456 457 458 459 460

If the query produces more columns than there are elements in the column specification, then the remainder are dumped into the text section. If there are fewer columns produced than given in the spec, the additional elements will be allocated but not filled (i.e., they are uninitialized and will have garbage).


\param typelist A string consisting of the letters \c nvmtw. For example, if your query columns should go into a text column, the vector, the weights, and two matrix columns, this would be "tvwmm".
\param fmt A <tt>printf</tt>-style SQL query.
\exception out->error=='d' Dimension error. Your count of matrix parts didn't match what the query returned.
\exception out->error=='q' Query error. A valid query that returns no rows is not an error; in that case, you get \c NULL.
461 462 463 464 465 466

\li \ref apop_opts_type "apop_opts.db_name_column" is ignored.  Use the \c 'n' character
    to indicate the output column with row names.
\li As with the other \c apop_query_to_... functions, the query can include printf-style
    format specifiers, such as <tt>apop_query_to_mixed_data("tv", "select name, age from

467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
*/
apop_data * apop_query_to_mixed_data(const char *typelist, const char * fmt, ...){
    Fillin(query, fmt)
    if (!apop_opts.db_engine) get_db_type();
    if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        {apop_data* out = apop_mysql_mixed_query(typelist, query);
        free(query);
        return out;}
#else
        {Apop_notify(0, "Apophenia was compiled without mysql support.");
        return 0;}
#endif
    //else
    apop_data *out = apop_sqlite_multiquery(typelist, query);
    free(query);
    return out;
}

/* Convenience function for extending a string. 
 asprintf(%q, "%s and stuff", q);
 gives you a memory leak. This takes care of that.
 */
void qxprintf(char **q, char *format, ...){
    va_list ap;
    char *r = *q;
    va_start(ap, format);
    Apop_stopif(vasprintf(q, format, ap)==-1, , 0, "Trouble writing to a string.");
    va_end(ap);
    free(r);
}

static void add_a_number (char **q, char *comma, double v){
    if (gsl_isnan(v))
        qxprintf(q,"%s%c NULL ", *q, *comma);
    else if (isinf(v)==1)
        qxprintf(q,"%s%c  'inf'", *q, *comma);
    else if (isinf(v)==-1)
        qxprintf(q,"%s%c  '-inf' ", *q, *comma);
    else
        qxprintf(q,"%s%c %g ",*q ,*comma, v);
    *comma = ',';
}

static int run_prepared_statements(apop_data const *set, sqlite3_stmt *p_stmt){
#if SQLITE_VERSION_NUMBER < 3003009
     Apop_stopif(1, return -1, 0, "Attempting to use prepared statements, but using a version of SQLite that doesn't support them.");
#else
    Get_vmsizes(set) //firstcol, msize1, maxsize
    for (size_t row=0; row < maxsize; row++){
        size_t field =1;
518
        if (set->names && set->names->rowct>row){
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
            if (!strlen(set->names->row[row])) field++; //leave NULL and cleared
            Apop_stopif(sqlite3_bind_text(p_stmt, field++, set->names->row[row], -1, SQLITE_TRANSIENT),
                    return -1, apop_errorlevel, 
                    "Something wrong with the row name for line %zu, [%s].\n" , row, set->names->row[row]);
        }
        if (set->vector && set->vector->size > row)
                Apop_stopif(sqlite3_bind_double(p_stmt, field++, apop_data_get(set, row, -1)),
                    return -1, apop_errorlevel, 
                    "Something wrong with the vector element on line %zu, [%g].\n" ,row,  apop_data_get(set, row, -1));
        if (msize1 > row)
            for (size_t col=0; col < msize2; col++)
                Apop_stopif(sqlite3_bind_double(p_stmt, field++, apop_data_get(set, row, col)),
                    return -1, apop_errorlevel, 
                    "Something wrong with the matrix element %zu on line %zu, [%g].\n" ,col, row,  apop_data_get(set, row, col));
        if (*set->textsize > row)
            for (size_t col=0; col < set->textsize[1]; col++){
535 536
                if (!strlen(set->text[row][col]) || (apop_opts.nan_string && !strcasecmp(apop_opts.nan_string, set->text[row][col])))
                    {field++; continue;} //leave NULL and cleared
537 538
                Apop_stopif(sqlite3_bind_text(p_stmt, field++, set->text[row][col], -1, SQLITE_TRANSIENT),
                    return -1, apop_errorlevel, 
539
                    "Something wrong with a text element at row %zu, col %zu [%s].\n" , row, col, set->text[row][col]);
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
            }
        if (set->weights && set->weights->size > row)
                Apop_stopif(sqlite3_bind_double(p_stmt, field++, gsl_vector_get(set->weights, row)),
                    return -1, apop_errorlevel, 
                    "Something wrong with the weight element on line %zu, [%g].\n" ,row,  gsl_vector_get(set->weights, row));
        int err = sqlite3_step(p_stmt);
        Apop_stopif(err!=0 && err != 101 //0=ok, 101=done
                    , , 0, "prepared sqlite insert query gave error code %i.\n", err);
        Apop_stopif(sqlite3_reset(p_stmt), return -1, apop_errorlevel, "SQLite error.");
        Apop_stopif(sqlite3_clear_bindings(p_stmt), return -1, apop_errorlevel, "SQLite error."); //needed for NULLs
    }
    Apop_stopif(sqlite3_finalize(p_stmt)!=SQLITE_OK, return -1, apop_errorlevel, "SQLite error.");
    return 0;
#endif
}

556
//users are expected to call apop_data_print.
557 558 559 560 561
int apop_data_to_db(const apop_data *set, const char *tabname, const char output_append){
    Apop_stopif(!set, return -1, 1, "you sent me a NULL data set. Database table %s will not be created.", tabname);
    int	i,j; 
    char *q;
    char comma = ' ';
562
    int use_row = (apop_opts.db_name_column && strlen(apop_opts.db_name_column))  && set->names
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
                && ((set->matrix && set->names->rowct == set->matrix->size1)
                    || (set->vector && set->names->rowct == set->vector->size));

    if (!apop_opts.db_engine) get_db_type();
    if (apop_table_exists(tabname))
        Asprintf(&q, " ");
    else if (apop_opts.db_engine == 'm')
#ifdef HAVE_MYSQL
        if (((output_append =='a' || output_append =='A') && apop_table_exists(tabname)))
            Asprintf(&q, " ");
        else {
            Asprintf(&q, "create table %s (", tabname);
            if (use_row) {
                qxprintf(&q, "%s\n %s varchar(1000)", q, apop_opts.db_name_column);
                comma = ',';
            }
            if (set->vector){
580
                if(!set->names || !set->names->vector) 
581 582 583 584 585 586 587
                    qxprintf(&q, "%s%c\n vector double ", q, comma);
                else
                    qxprintf(&q, "%s%c\n %s double ", q,comma, set->names->vector);
                comma = ',';
            }
            if (set->matrix)
                for(i=0;i< set->matrix->size2; i++){
588
                    if(!set->names || set->names->colct <= i) 
589 590 591 592 593 594
                        qxprintf(&q, "%s%c\n c%i double ", q, comma,i);
                     else
                        qxprintf(&q, "%s%c\n %s  double ", q, comma, set->names->col[i]);
                    comma = ',';
                }
            for(i=0;i< set->textsize[1]; i++){
595
                if (!set->names || set->names->textct <= i)
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
                    qxprintf(&q, "%s%c\n tc%i varchar(1000) ", q, comma,i);
                else
                    qxprintf(&q, "%s%c\n %s  varchar(1000) ", q, comma, set->names->text[i]);
                comma = ',';
            }
            apop_query("%s); ", q);
            sprintf(q, " ");
        }
#else 
        Apop_stopif(1, return -1, apop_errorlevel, "Apophenia was compiled without mysql support.");
#endif
    else {
        if (db==NULL) apop_db_open(NULL);
        if (((output_append =='a' || output_append =='A') && apop_table_exists(tabname)) )
            Asprintf(&q, " ");
        else {
            Asprintf(&q, "create table %s (", tabname);
            if (use_row) {
                qxprintf(&q, "%s\n %s", q, apop_opts.db_name_column);
                comma = ',';
            }
            if (set->vector){
618
                if (!set->names || !set->names->vector) qxprintf(&q, "%s%c\n vector numeric", q, comma);
619 620 621 622 623
                else qxprintf(&q, "%s%c\n \"%s\"", q, comma, set->names->vector);
                comma = ',';
            }
            if (set->matrix)
                for(i=0;i< set->matrix->size2; i++){
624
                    if(!set->names || set->names->colct <= i) 	
625 626 627 628 629 630
                        qxprintf(&q, "%s%c\n c%i numeric", q, comma,i);
                    else			
                        qxprintf(&q, "%s%c\n \"%s\" numeric", q, comma, set->names->col[i]);
                    comma = ',';
                }
            for(i=0; i< set->textsize[1]; i++){
631
                if(!set->names || set->names->textct <= i) qxprintf(&q, "%s%c\n tc%i ", q, comma, i);
632 633 634 635 636 637 638 639 640 641 642
                else qxprintf(&q, "%s%c\n %s ", q, comma, set->names->text[i]);
                comma = ',';
            }
            if (set->weights) qxprintf(&q, "%s%c\n \"weights\" numeric", q, comma);
            qxprintf(&q,"%s);",q);
            apop_query("%s", q);
            qxprintf(&q," ");
        }
    }

    Get_vmsizes(set) //firstcol, msize2, maxsize
643
    int col_ct = (set->names ? !!set->names->rowct : 0) + set->textsize[1] + msize2 - firstcol + !!set->weights;
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
    Apop_stopif(!col_ct, return -1, 0, "Input data set has zero columns of data (no rownames, text, matrix, vector, or weights). I can't create a table like that, sorry.");
    if(apop_use_sqlite_prepared_statements(col_ct)){
        sqlite3_stmt *statement;
        Apop_stopif(
            apop_prepare_prepared_statements(tabname, col_ct, &statement), 
            return -1, 0, "Trouble preparing prepared statements.");
        Apop_stopif(
            run_prepared_statements(set, statement), 
            return -1, 0, "error in insertions.");
    } else {
        for(i=0; i< maxsize; i++){
            comma = ' ';
            qxprintf(&q, "%s \n insert into %s values(",q, tabname);
            if (use_row){
                char *fixed= prep_string_for_sqlite(0, set->names->row[i]);
                qxprintf(&q, "%s %s ",q, fixed);
                free(fixed);
                comma = ',';
            }
            if (set->vector)
               add_a_number (&q, &comma, gsl_vector_get(set->vector,i));
            if (set->matrix)
                for(j=0; j< set->matrix->size2; j++)
                   add_a_number (&q, &comma, gsl_matrix_get(set->matrix,i,j));
            for(j=0; j< set->textsize[1]; j++){
                char *fixed= prep_string_for_sqlite(0, set->text[i][j]);
                qxprintf(&q, "%s%c %s ",q, comma,fixed ? fixed : "''");
                free(fixed);
                comma = ',';
            }
            if (set->weights)
               add_a_number (&q, &comma, gsl_vector_get(set->weights,i));
            qxprintf(&q,"%s);",q);
            apop_query("%s", q); 
            q[0]='\0';
        }
    }
	free(q);
    return 0;
}