Data Server Variants

From PeformIQ Upgrade
Jump to navigation Jump to search

1.1.3

#!/usr/bin/env python
#
#  Purpose: Threaded data server implementation
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
  Threaded server model

  Server side: open a socket on a port, listen for
  a message from a client, and accept a request and
  service it.

  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;

  This version has been extended to use the standard Python
  logging module.

  Add the delimiter to the INI file to allow use of alternate
  delimiters in transmitted data - so data with embedded commas
  can be used.
"""
#---------------------------------------------------------------------

import os
import csv
import sys
import getopt
import thread
import time
import signal
import logging

#---------------------------------------------------------------------

from socket   import *          # get socket constructor and constants
from datetime import datetime

#---------------------------------------------------------------------

__version__   = "1.1.3"
__id__        = "@(#)  dserver.py  [%s]  30/04/2008"

check_flg     = False
daemon_flg    = False
silent_flg    = False
terminate_flg = False
verbose_flg   = False
wait_flg      = False

debug_level   = 0

HOST          = ''             #  Host server - '' means localhost
PORT          = 9578           #  Listen on a non-reserved port number

sockobj       = None

dserver_dir   = None
data_dir      = None
pid_path      = None

CONFIGFILE    = "dserver.ini"
LOGFILE       = "dserver.log"
PIDFILE       = "dserver.pid"

tables        = []

INVALID       = "INVALID"

log           = None

#=====================================================================

class Group:
   Name     = None
   Idx      = None
   Data     = None

   def __init__(self, name):
      self.Name = name
      self.Idx  = 0
      self.Data = []

   def __str__(self):
      s = "Grp %s  Len %d" % (self.Name, len(self.Data))
      return s

   def append(self, s):
      self.Data.append(s)

   def set(self):
      if len(self.Data) > 0:
         self.Idx  = 0
      else:
         self.Idx  = -1

#---------------------------------------------------------------------

class Table:
   Count    = 0
   Valid    = False
   Name     = None
   Type     = None
   Idx      = None
   Data     = None

   def __init__(self, name, type, delimiter=','):
      self.Name       = name
      self.Type       = type
      self.Delimiter  = delimiter
      self.File       = name + ".dat"
      self.Used       = name + ".used"
      self.Stored     = name + ".stored"

      if self.Type == "CSV":
         rc = self.read_csv()
      elif self.Type == "Sequence":
         rc = self.read_sequence()
      elif self.Type == "Indexed":
         rc = self.read_indexed()
      elif self.Type == "Keyed":
         rc = self.read_keyed()

      if rc > 0:
         self.Valid = True

      try:
        self.ufh = open(self.Used, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      try:
        self.sfh = open(self.Stored, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

   #------------------------------------------------------------------

   def __str__(self):
      s = "Table: %-10s Type: %-10s" % (self.Name, self.Type)

      if self.Valid:
         s += " * "
         if self.Type == "CSV":
            s += " %d rows" % len(self.Data)
         elif self.Type == "Sequence":
            s += " Starting value %d" % self.Data
         elif self.Type == "Indexed":
            s += " %d rows"   % len(self.Data)
         elif self.Type == "Keyed":
            s += " %d groups" % len(self.Data)
      else:
         s += "   "

      return s

   #------------------------------------------------------------------

   def read_csv(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = []

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         self.Data.append(line)

      f.close()

      self.Idx = 0

      if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_sequence(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         try:
            no = int(line)
         except:
            no = 0

         self.Data = no

      f.close()

      return 1

   #------------------------------------------------------------------

   def read_keyed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      groupName  = None
      group      = None

      self.Data  = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if (line.find("[") != -1):
            group_name            = line.replace('[','').replace(']','')
	    group                 = Group(group_name)
            self.Data[group_name] = group
            continue

         elif (line.find("#") != -1):
            continue

         elif (len(line) == 0):
            continue

         else:
            group.append(line)

      f.close()

      if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_indexed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         (no, data) = line.split(':')

	 self.Data[no] = data

      f.close()

      if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def flush(self):
      if not self.Valid:
         return

      ts = datetime.now().strftime('%Y%m%d%H%M%S')

      self.BackupCmd  = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)

      if self.Type == "CSV":
         self.flush_csv()
      elif self.Type == "Sequence":
         self.flush_sequence()
      elif self.Type == "Indexed":
         self.flush_indexed()
      elif self.Type == "Keyed":
         self.flush_keyed()

   #------------------------------------------------------------------

   def flush_csv(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      i = self.Idx

      while i < len(self.Data):
         f.write("%s\n" % self.Data[i])
         i += 1

      f.close()

   #------------------------------------------------------------------

   def flush_sequence(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      f.write("%d\n" % self.Data)

      f.close()

   #------------------------------------------------------------------

   def flush_keyed(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      group_keys = self.Data.keys()

      group_keys.sort()

      for key in group_keys:
         f.write("[%s]\n" % key)

	 group = self.Data[key]

         i = group.Idx

         while i < len(group.Data):
            f.write("%s\n" % group.Data[i])
            i += 1

         f.write("\n")

      f.close()

   #------------------------------------------------------------------

   def flush_indexed(self):
      pass

#=====================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[dserver]  %s" % msg

#---------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[dserver]  %s\n' % msg)

#---------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[dserver]  %s" % msg

#=====================================================================

def read_config():
   global PORT

   config_file = data_dir + CONFIGFILE

   try:
      f = open(config_file, 'r')
   except IOError, e:
      ERROR('Open failed: ' + str(e))
      sys.exit(1)

   config_flg     = False
   definition_flg = False

   while True:
      line = f.readline()

      if not line: break

      line = line[:-1]
      line = line.replace('\r','')

      line = line.strip()

      if (line.find("#") != -1): continue

      if (line.find("[Config]") != -1):
         config_flg = True

      elif (line.find("Port=") != -1):
          definition  = line.split("=")

          PORT = definition[1]

      if (line.find("[Data]") != -1):
         definition_flg = True

      elif (line.find("Description=") != -1):
          definition  = line.split("=")

          (name, type, delimiter) = definition[1].split(":")

          t = Table(name, type, delimiter)

          INFO(str(t))

          tables.append(t)

   f.close()

#---------------------------------------------------------------------

def get_table_index(name):
   for i in range(len(tables)):
      if (tables[i].Name == name):
         return i

   return -1

#---------------------------------------------------------------------

def process(str):
   msg = str.split("|")
   l   = len(msg)

   if debug_level > 1: INFO("[dserver::process] len %d  msg %s" % (l, msg))

   ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"

   if (msg[0] == "REG"):
      name = msg[1].replace('\n','').replace('\r','')
      idx  = get_table_index(name)
      if debug_level > 0: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))

      reply = "%d" % idx

   elif (msg[0] == "REGK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  REGK -> Bad Message", msg)

   elif (msg[0] == "REGI"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  REGI -> Bad Message", msg)

   elif (msg[0] == "GETN"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  GETN -> Bad Message", msg)
      hdl  = int(msg[1])

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Type == 'CSV':
            if (t.Idx < len(t.Data)):
               reply  = t.Data[t.Idx]
               t.Idx += 1
            else:
               reply = "*Exhausted*"
         elif t.Type == "Sequence":
            reply = "%d" % t.Data
            t.Data += 1
         else:
            reply = "UNKNOWN"
         t.ufh.write("%s - %s\n" % (ts, reply))

      if debug_level > 2: INFO("[dserver::process]  GETN -> %s" % reply)

   elif (msg[0] == "GETK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            if (g.Idx < len(g.Data)):
               reply  = g.Data[g.Idx]
               g.Idx += 1
            else:
               reply = "*Exhausted*"
         t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))

      if debug_level > 2: INFO("[dserver::process]  GETK %s -> %s" % (grp, reply))

   elif (msg[0] == "GETI"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETI -> Bad Message", msg)
      hdl  = int(msg[1])
      idx  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            reply = t.Data[idx]
         except:
            reply = "UNDEFINED"

         t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))

      if debug_level > 2: INFO("[dserver::process]  GETI %s -> %s" % (idx, reply))

   elif (msg[0] == "STOC"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  STOC -> Bad Message", msg)
      hdl   = int(msg[1])
      data  = msg[2]
      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         t.Data.append(data)
         t.sfh.write("%s - %s\n" % (ts, data))
         t.sfh.flush()
         if debug_level > 1: INFO("STOC %s" % data)
	 reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOC %s -> %s" % (data, reply))
      
   elif (msg[0] == "STOK"):
      if (len(msg) != 4):
         ERROR("[dserver::process]  STOK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]
      data = msg[3]

      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Data.has_key(grp):
            g = t.Data[grp]
         else:
            g           = Group(grp)
            t.Data[grp] = g
         if g != None:
            g.Data.append(data)
            if debug_level > 1: INFO("STOK %s %s" % (grp, data))
            t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
	    reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOK %s %s -> %s" % (grp, data, reply))

   return reply

#---------------------------------------------------------------------

def sig_term(signum, frame):
   "SIGTERM handler"

   shutdown()

#---------------------------------------------------------------------

def shutdown():
   INFO("Server shutdown at %s" % datetime.now())

   for i in range(len(tables)):
      tables[i].flush()

   try:
      os.unlink(pid_path)
   except IOError, e:
      ERROR('Unlink failed: ' + str(e))
      sys.exit(1)

   sys.exit(0)

#---------------------------------------------------------------------

def check_running():
   try:
      pfp = open(pid_path, 'r')
   except IOError, (errno, strerror):
      pfp = None
      # ERROR("I/O error(%s): %s" % (errno, strerror))
   except:
      ERROR("Unexpected error:", sys.exc_info()[0])
      raise

   if pfp:
      line = pfp.readline()
      line = line.strip()

      dserver_pid   = int(line)

      noProcess    = 0

      try:
         os.kill(dserver_pid, 0)
      except OSError, e:
         if e.errno == 3:
            noProcess = 1
         else:
            ERROR("kill() failed:" + str(e))
            sys.exit(0)

      if noProcess:
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         os.unlink(pid_path)

         return None
      else:
         pfp.close()
         return dserver_pid

      return dserver_pid
   else:
      return None

#---------------------------------------------------------------------

def create_pidfile():
   pid = os.getpid()

   try:
      pfp = open(pid_path, 'w')
   except IOError, e:
      ERROR("Open failed - " + str(e))
      sys.exit(0)

   pfp.write("%d" % pid)

   pfp.close()

   INFO("Running server with PID -> %d" % pid)

   return pid

#---------------------------------------------------------------------

def become_daemon():
   pid = os.fork()

   if pid == 0:                                             # In child
      pid = create_pidfile()
      time.sleep(1)
   elif pid == -1:                                # Should not happen!
      ERROR("fork() failed!")
      time.sleep(1)
      sys.exit(0)
   else:                                                   # In Parent
      time.sleep(1)
      sys.exit(0)

   time.sleep(2)

   os.setsid()

   return pid

#---------------------------------------------------------------------

def init():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)

   if daemon_flg:
      pid = become_daemon()
   else:
      pid = create_pidfile()

   global log

   log  = logging.getLogger('dserver')
   hdlr = logging.FileHandler(LOGFILE)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("Started processing")

   read_config()

   if (not silent_flg):
      INFO("Server PID is %d" % pid)

#---------------------------------------------------------------------

def terminate():
   dserver_pid = check_running()

   if dserver_pid:
      if (not silent_flg):
         INFO("Terminating server with pid, %d" % dserver_pid)

      os.kill(dserver_pid, signal.SIGTERM)

      if (wait_flg):
         while True:
            try:
               kill(dserver_pid, 0)
            except OSError, e:
               if e.errno == 3:
                  break
               else:
                  ERROR("kill() failed:" + str(e))
                  sys.exit(0)
 
            time.sleep(1)

   return 0

#---------------------------------------------------------------------

def check():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)
   else:
      print "[dserver]  Server not running"

#==== Socket Server ==================================================

def init_connection():
   global sockobj

   sockobj = socket(AF_INET, SOCK_STREAM)  # make a TCP socket object
   sockobj.bind((HOST, PORT))              # bind it to server port number
   sockobj.listen(10)                      # allow upto 10 pending connects

#---------------------------------------------------------------------

def handle_client(connection):             # in spawned thread: reply
   while True:                             # read, write a client socket
      try:
         request = connection.recv(1024)
      except:
         break

      if debug_level > 0: INFO('[dserver]  Request -> "%s"' % request)

      if not request: break

      reply = process(request)

      if debug_level > 0: INFO('[dserver]  Reply   -> "%s..."' % reply[0:30])

      connection.send(reply)

   connection.close() 

#---------------------------------------------------------------------

def dispatcher():
   while True:
      # Wait for next connection,
      connection, address = sockobj.accept()

      INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))

      thread.start_new(handle_client, (connection,))

#=====================================================================

def main():
   global check_flg
   global daemon_flg
   global terminate_flg
   global verbose_flg
   global wait_flg
   global debug_level
   global dserver_dir
   global data_dir
   global pid_path

   try:
      opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   try:
      dserver_dir  = os.environ["DSERVER_DIR"]
   except KeyError, e:
      print "Set DSERVER_DIR environment variable and rerun!"
      return 1

   wrk_path  = os.getcwd()
   wrk_dir   = os.path.basename(wrk_path)

   # data_dir = dserver_dir  + '/DATA/'

   data_dir = wrk_path + '/DATA/'
   pid_path = data_dir + PIDFILE

   os.chdir(data_dir)

   for o, a in opts:
      if o == '-d':
         debug_level   += 1
      elif o == '-c':
         check_flg      = True
      elif o == '-D':
         daemon_flg     = True
      elif o == '-s':
         tsilent_flg    = True
      elif o == '-T':
         terminate_flg  = True
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[dserver]  Version: %s" % __version__
         return 1
      elif o == '-w':
         wait_flg       = True
      elif o == '-?':
         print __doc__
         return 1

   print "[dserver]  Listening on port %s - running from %s" % (PORT, os.getcwd())

   if check_flg:
      check()
      return 0

   if terminate_flg:
      terminate()
      return 0

   if (debug_level > 0): print "Debugging level set to %d" % debug_level

   if args:
      for arg in args:
         print arg

   signal.signal(signal.SIGTERM, sig_term)

   init()

   init_connection()

   dispatcher()

   return 0

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[dserver]  Interrupted!"
      shutdown()

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""


Other

#!/usr/bin/env python
#
#  Purpose: Threaded data server implementation
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
  Threaded server model

  Server side: open a socket on a port, listen for
  a message from a client, and accept a request and
  service it.

  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;

  This version has been extended to use the standard Python
  logging module.

  Add the delimiter to the INI file to allow use of alternate
  delimiters in transmitted data - so data with embedded commas
  can be used.
"""
#---------------------------------------------------------------------

import os
import re
import csv
import sys
import getopt
import thread
import time
import signal
import logging

#---------------------------------------------------------------------

from socket   import *          # get socket constructor and constants
from datetime import datetime

#---------------------------------------------------------------------

__version__   = "1.1.3"
__id__        = "@(#)  dserver.py  [%s]  30/04/2008"

check_flg     = False
daemon_flg    = False
silent_flg    = False
terminate_flg = False
verbose_flg   = False
wait_flg      = False

debug_level   = 0

HOST          = ''             #  Host server - '' means localhost
PORT          = 9575           #  Listen on a non-reserved port number

sockobj       = None

dserver_dir   = None
data_dir      = None
pid_path      = None

CONFIGFILE    = "dserver.ini"
LOGFILE       = "dserver.log"
PIDFILE       = "dserver.pid"

tables        = []

INVALID       = "INVALID"

log           = None

#=====================================================================

class Group:
   Name     = None
   Idx      = None
   Data     = None

   def __init__(self, name):
      self.Name = name
      self.Idx  = 0
      self.Data = []

   def __str__(self):
      s = "Grp %s  Len %d" % (self.Name, len(self.Data))
      return s

   def append(self, s):
      self.Data.append(s)

   def set(self):
      if len(self.Data) > 0:
         self.Idx  = 0
      else:
         self.Idx  = -1

#---------------------------------------------------------------------

class Table:
   Count    = 0
   Valid    = False
   Name     = None
   Type     = None
   Idx      = None
   Data     = None

   def __init__(self, name, type, delimiter=','):
      self.Name       = name
      self.Type       = type
      self.Delimiter  = delimiter
      self.File       = name + ".dat"
      self.Used       = name + ".used"
      self.Stored     = name + ".stored"

      if self.Type == "CSV":
         rc = self.read_csv()
      elif self.Type == "Sequence":
         rc = self.read_sequence()
      elif self.Type == "Indexed":
         rc = self.read_indexed()
      elif self.Type == "Keyed":
         rc = self.read_keyed()

      if rc > 0:
         self.Valid = True

      try:
        self.ufh = open(self.Used, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      try:
        self.sfh = open(self.Stored, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

   #------------------------------------------------------------------

   def __str__(self):
      s = "Table: %-10s Type: %-10s" % (self.Name, self.Type)

      if self.Valid:
         s += " * "
         if self.Type == "CSV":
            s += " %d rows" % len(self.Data)
         elif self.Type == "Sequence":
            s += " Starting value %d" % self.Data
         elif self.Type == "Indexed":
            s += " %d rows"   % len(self.Data)
         elif self.Type == "Keyed":
            s += " %d groups" % len(self.Data)
      else:
         s += "   "

      return s

   #------------------------------------------------------------------

   def read_csv(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = []

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         self.Data.append(line)

      f.close()

      self.Idx = 0

      if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_sequence(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         try:
            no = int(line)
         except:
            no = 0

         self.Data = no

      f.close()

      return 1

   #------------------------------------------------------------------

   def read_keyed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      groupName  = None
      group      = None

      self.Data  = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if (line.find("[") != -1):
            group_name            = line.replace('[','').replace(']','')
	    group                 = Group(group_name)
            self.Data[group_name] = group
            continue

         elif (line.find("#") != -1):
            continue

         elif (len(line) == 0):
            continue

         else:
            group.append(line)

      f.close()

      if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_indexed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         (no, data) = line.split(':')

	 self.Data[no] = data

      f.close()

      if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def flush(self):
      if not self.Valid:
         return

      ts = datetime.now().strftime('%Y%m%d%H%M%S')

      self.BackupCmd  = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)

      print "Flushing %s" % self.Name

      if self.Type == "CSV":
         self.flush_csv()
      elif self.Type == "Sequence":
         self.flush_sequence()
      elif self.Type == "Indexed":
         self.flush_indexed()
      elif self.Type == "Keyed":
         self.flush_keyed()

   #------------------------------------------------------------------

   def flush_csv(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      i = self.Idx

      while i < len(self.Data):
         f.write("%s\n" % self.Data[i])
         i += 1

      f.close()

   #------------------------------------------------------------------

   def flush_sequence(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      f.write("%d\n" % self.Data)

      f.close()

   #------------------------------------------------------------------

   def flush_keyed(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      group_keys = self.Data.keys()

      group_keys.sort()

      for key in group_keys:
         f.write("[%s]\n" % key)

	 group = self.Data[key]

         i = group.Idx

         while i < len(group.Data):
            f.write("%s\n" % group.Data[i])
            i += 1

         f.write("\n")

      f.close()

   #------------------------------------------------------------------

   def flush_indexed(self):
      pass

#=====================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[dserver]  %s" % msg

#---------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[dserver]  %s\n' % msg)

#---------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[dserver]  %s" % msg

#=====================================================================

def read_config():
   global PORT

   config_file = data_dir + CONFIGFILE

   try:
      f = open(config_file, 'r')
   except IOError, e:
      ERROR('Open failed: ' + str(e))
      sys.exit(1)

   config_flg     = False
   definition_flg = False

   while True:
      line = f.readline()

      if not line: break

      line = line[:-1]
      line = line.replace('\r','')

      line = line.strip()

      if (line.find("#") != -1): continue

      if (line.find("[Config]") != -1):
         config_flg = True

      elif (line.find("Port=") != -1):
          definition  = line.split("=")

          PORT = int(definition[1].strip())

      if (line.find("[Data]") != -1):
         definition_flg = True

      elif (line.find("Description=") != -1):
          definition  = line.split("=")

          (name, type, delimiter) = definition[1].split(":")

          t = Table(name, type, delimiter)

          INFO(str(t))

          tables.append(t)

   f.close()

#---------------------------------------------------------------------

def get_table_index(name):
   for i in range(len(tables)):
      if (tables[i].Name == name):
         return i

   return -1

#---------------------------------------------------------------------

def process(str):
   msg = str.split("|")
   l   = len(msg)

   if debug_level > 1: INFO("[dserver::process] len %d  msg %s" % (l, msg))

   ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"

   if (msg[0] == "REG"):
      name = msg[1].replace('\n','').replace('\r','')
      idx  = get_table_index(name)
      if debug_level > 0: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))

      reply = "%d" % idx

   elif (msg[0] == "REGK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  REGK -> Bad Message", msg)

   elif (msg[0] == "REGI"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  REGI -> Bad Message", msg)

   elif (msg[0] == "GETN"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  GETN -> Bad Message", msg)
      hdl  = int(msg[1])

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Type == 'CSV':
            if (t.Idx < len(t.Data)):
               reply  = t.Data[t.Idx]
               t.Idx += 1
            else:
               reply = "*Exhausted*"
         elif t.Type == "Sequence":
            reply = "%d" % t.Data
            t.Data += 1
         else:
            reply = "UNKNOWN"
         t.ufh.write("%s - %s\n" % (ts, reply))

      if debug_level > 2: INFO("[dserver::process]  GETN -> %s" % reply)

   elif (msg[0] == "GETK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            if (g.Idx < len(g.Data)):
               reply  = g.Data[g.Idx]
               reply  = re.sub(", *", ",", reply)
               g.Idx += 1
            else:
               reply = "*Exhausted*"
         t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))

      if debug_level > 2: INFO("[dserver::process]  GETK %s -> %s" % (grp, reply))

   elif (msg[0] == "GETI"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETI -> Bad Message", msg)
      hdl  = int(msg[1])
      idx  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            reply = t.Data[idx]
         except:
            reply = "UNDEFINED"

         t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))

      if debug_level > 2: INFO("[dserver::process]  GETI %s -> %s" % (idx, reply))

   elif (msg[0] == "STOC"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  STOC -> Bad Message", msg)
      hdl   = int(msg[1])
      data  = msg[2]
      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         t.Data.append(data)
         t.sfh.write("%s - %s\n" % (ts, data))
         t.sfh.flush()
         if debug_level > 1: INFO("STOC %s" % data)
	 reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOC %s -> %s" % (data, reply))
      
   elif (msg[0] == "STOK"):
      if (len(msg) != 4):
         ERROR("[dserver::process]  STOK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]
      data = msg[3]

      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Data.has_key(grp):
            g = t.Data[grp]
         else:
            g           = Group(grp)
            t.Data[grp] = g
         if g != None:
            g.Data.append(data)
            if debug_level > 1: INFO("STOK %s %s" % (grp, data))
            t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
	    reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOK %s %s -> %s" % (grp, data, reply))

   return reply

#---------------------------------------------------------------------

def sig_term(signum, frame):
   "SIGTERM handler"

   shutdown()

#---------------------------------------------------------------------

def shutdown():
   INFO("Server shutdown at %s" % datetime.now())

   print "\n"

   for i in range(len(tables)):
      tables[i].flush()

   print "*SHUTDOWN*"

   try:
      os.unlink(pid_path)
   except IOError, e:
      ERROR('Unlink failed: ' + str(e))
      sys.exit(1)

   sys.exit(0)

#---------------------------------------------------------------------

def check_running():
   try:
      pfp = open(pid_path, 'r')
   except IOError, (errno, strerror):
      pfp = None
      # ERROR("I/O error(%s): %s" % (errno, strerror))
   except:
      ERROR("Unexpected error:", sys.exc_info()[0])
      raise

   if pfp:
      line = pfp.readline()
      line = line.strip()

      dserver_pid   = int(line)

      noProcess    = 0

      try:
         os.kill(dserver_pid, 0)
      except OSError, e:
         if e.errno == 3:
            noProcess = 1
         else:
            ERROR("kill() failed:" + str(e))
            sys.exit(0)

      if noProcess:
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         os.unlink(pid_path)

         return None
      else:
         pfp.close()
         return dserver_pid

      return dserver_pid
   else:
      return None

#---------------------------------------------------------------------

def create_pidfile():
   pid = os.getpid()

   try:
      pfp = open(pid_path, 'w')
   except IOError, e:
      ERROR("Open failed - " + str(e))
      sys.exit(0)

   pfp.write("%d" % pid)

   pfp.close()

   INFO("Running server with PID -> %d" % pid)

   return pid

#---------------------------------------------------------------------

def become_daemon():
   pid = os.fork()

   if pid == 0:                                             # In child
      pid = create_pidfile()
      time.sleep(1)
   elif pid == -1:                                # Should not happen!
      ERROR("fork() failed!")
      time.sleep(1)
      sys.exit(0)
   else:                                                   # In Parent
      time.sleep(1)
      sys.exit(0)

   time.sleep(2)

   os.setsid()

   return pid

#---------------------------------------------------------------------

def init():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)

   if daemon_flg:
      pid = become_daemon()
   else:
      pid = create_pidfile()

   global log

   log  = logging.getLogger('dserver')
   hdlr = logging.FileHandler(LOGFILE)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("Started processing")

   read_config()

   if (not silent_flg):
      INFO("Server PID is %d" % pid)

   print "\nData Loaded..."

#---------------------------------------------------------------------

def terminate():
   dserver_pid = check_running()

   if dserver_pid:
      if (not silent_flg):
         INFO("Terminating server with pid, %d" % dserver_pid)

      os.kill(dserver_pid, signal.SIGTERM)

      if (wait_flg):
         while True:
            try:
               kill(dserver_pid, 0)
            except OSError, e:
               if e.errno == 3:
                  break
               else:
                  ERROR("kill() failed:" + str(e))
                  sys.exit(0)
 
            time.sleep(1)

   return 0

#---------------------------------------------------------------------

def check():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)
   else:
      print "[dserver]  Server not running"

#==== Socket Server ==================================================

def init_connection():
   global sockobj

   sockobj = socket(AF_INET, SOCK_STREAM)  # make a TCP socket object
   sockobj.bind((HOST, PORT))              # bind it to server port number
   sockobj.listen(10)                      # allow upto 10 pending connects

#---------------------------------------------------------------------

def handle_client(connection):             # in spawned thread: reply
   while True:                             # read, write a client socket
      try:
         request = connection.recv(1024)
      except:
         break

      if debug_level > 0: INFO('[dserver]  Request -> "%s"' % request)

      if not request: break

      reply = process(request)

      if debug_level > 0: INFO('[dserver]  Reply   -> "%s..."' % reply[0:30])

      connection.send(reply)

   connection.close() 

#---------------------------------------------------------------------

def dispatcher():
   while True:
      # Wait for next connection,
      connection, address = sockobj.accept()

      INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))

      thread.start_new(handle_client, (connection,))

#=====================================================================

def main():
   global check_flg
   global daemon_flg
   global terminate_flg
   global verbose_flg
   global wait_flg
   global debug_level
   global dserver_dir
   global data_dir
   global pid_path

   try:
      opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   try:
      dserver_dir  = os.environ["DSERVER_DIR"]
   except KeyError, e:
      print "Set DSERVER_DIR environment variable and rerun!"
      return 1

   wrk_path  = os.getcwd()
   wrk_dir   = os.path.basename(wrk_path)

   # data_dir = dserver_dir  + '/DATA/'

   data_dir = wrk_path + '/DATA/'
   pid_path = data_dir + PIDFILE

   os.chdir(data_dir)

   for o, a in opts:
      if o == '-d':
         debug_level   += 1
      elif o == '-c':
         check_flg      = True
      elif o == '-D':
         daemon_flg     = True
      elif o == '-s':
         tsilent_flg    = True
      elif o == '-T':
         terminate_flg  = True
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[dserver]  Version: %s" % __version__
         return 1
      elif o == '-w':
         wait_flg       = True
      elif o == '-?':
         print __doc__
         return 1

   print "[dserver]  Listening on port %s - running from %s" % (PORT, os.getcwd())

   if check_flg:
      check()
      return 0

   if terminate_flg:
      terminate()
      return 0

   if (debug_level > 0): print "Debugging level set to %d" % debug_level

   if args:
      for arg in args:
         print arg

   signal.signal(signal.SIGTERM, sig_term)

   init()

   init_connection()

   dispatcher()

   return 0

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[dserver]  Interrupted!"
      shutdown()

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""


Old Versions

1.1.5

#!/usr/bin/env python
#
#  Purpose: Threaded data server implementation
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
  Threaded server model

  Server side: open a socket on a port, listen for
  a message from a client, and accept a request and
  service it.

  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;

  This version has been extended to use the standard Python
  logging module.

  Add the delimiter to the INI file to allow use of alternate
  delimiters in transmitted data - so data with embedded commas
  can be used.
"""
#---------------------------------------------------------------------

import os
import re
import csv
import sys
import getopt
import thread
import time
import signal
import logging

#---------------------------------------------------------------------

from socket   import *          # get socket constructor and constants
from datetime import datetime

#---------------------------------------------------------------------

__version__   = "1.1.5"
__id__        = "@(#)  dserver.py  [%s]  2008-06-10" % __version__

check_flg     = False
daemon_flg    = False
silent_flg    = False
terminate_flg = False
verbose_flg   = False
wait_flg      = False

debug_level   = 0

HOST          = ''             #  Host server - '' means localhost
PORT          = 9572           #  Listen on a non-reserved port number

sockobj       = None

dserver_dir   = None
data_dir      = None
pid_path      = None

CONFIGFILE    = "dserver.ini"
LOGFILE       = "dserver.log"
PIDFILE       = "dserver.pid"

tables        = []

INVALID       = "INVALID"

log           = None

#=====================================================================

class Group:
   Name     = None
   Idx      = None
   Data     = None

   def __init__(self, name):
      self.Name = name
      self.Idx  = 0
      self.Data = []

   def __str__(self):
      s = "Grp %s  Len %d" % (self.Name, len(self.Data))
      return s

   def append(self, s):
      self.Data.append(s)

   def set(self):
      if len(self.Data) > 0:
         self.Idx  = 0
      else:
         self.Idx  = -1

#---------------------------------------------------------------------

class Table:
   Count    = 0
   Valid    = False
   Name     = None
   Type     = None
   Idx      = None
   Data     = None

   def __init__(self, name, type, delimiter=','):
      self.Name       = name
      self.Type       = type
      self.Delimiter  = delimiter
      self.File       = name + ".dat"
      self.Used       = name + ".used"
      self.Stored     = name + ".stored"

      sys.stderr.write("Loading %s\n" % self.Name)
      sys.stderr.flush()

      if self.Type == "CSV":
         rc = self.read_csv()
      elif self.Type == "Sequence":
         rc = self.read_sequence()
      elif self.Type == "Indexed":
         rc = self.read_indexed()
      elif self.Type == "Keyed":
         rc = self.read_keyed()

      if rc > 0:
         self.Valid = True

      try:
        self.ufh = open(self.Used, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

      try:
        self.sfh = open(self.Stored, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

   #------------------------------------------------------------------

   def __str__(self):
      s = "Table: %-22s Type: %-10s" % (self.Name, self.Type)

      if self.Valid:
         s += " * "
         if self.Type == "CSV":
            s += " %7d rows" % len(self.Data)
         elif self.Type == "Sequence":
            s += " Starting value %d" % self.Data
         elif self.Type == "Indexed":
            s += " %7d rows"   % len(self.Data)
         elif self.Type == "Keyed":
            s += " %7d groups" % len(self.Data)
      else:
         s += "   "

      return s

   #------------------------------------------------------------------

   def read_csv(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

      self.Data = []

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         self.Data.append(line)

      f.close()

      self.Idx = 0

      if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_sequence(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         try:
            no = int(line)
         except:
            no = 0

         self.Data = no

      f.close()

      return 1

   #------------------------------------------------------------------

   def read_keyed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

      groupName  = None
      group      = None

      self.Data  = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if (line.find("[") != -1):
            group_name            = line.replace('[','').replace(']','')
	    group                 = Group(group_name)
            self.Data[group_name] = group
            continue

         elif (line.find("#") != -1):
            continue

         elif (len(line) == 0):
            continue

         else:
            group.append(line)

      f.close()

      if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_indexed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         sys.exit(1)

      self.Data = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         try:
            (no, data) = line.split(':')
         except ValueError, e:
            sys.stderr.write('[dserver]  Parse failed (%s): %s \n' % (self.File, str(e)))
            sys.exit(1)

	 self.Data[no] = data

      f.close()

      if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def flush(self):
      if not self.Valid:
         return

      ts = datetime.now().strftime('%Y%m%d%H%M%S')

      self.BackupCmd  = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)

      print "Flushing %s" % self.Name

      if self.Type == "CSV":
         self.flush_csv()
      elif self.Type == "Sequence":
         self.flush_sequence()
      elif self.Type == "Indexed":
         self.flush_indexed()
      elif self.Type == "Keyed":
         self.flush_keyed()

   #------------------------------------------------------------------

   def flush_csv(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         return 0

      i = self.Idx

      while i < len(self.Data):
         f.write("%s\n" % self.Data[i])
         i += 1

      f.close()

   #------------------------------------------------------------------

   def flush_sequence(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         return 0

      f.write("%d\n" % self.Data)

      f.close()

   #------------------------------------------------------------------

   def flush_keyed(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
         return 0

      group_keys = self.Data.keys()

      group_keys.sort()

      for key in group_keys:
         f.write("[%s]\n" % key)

	 group = self.Data[key]

         i = group.Idx

         while i < len(group.Data):
            f.write("%s\n" % group.Data[i])
            i += 1

         f.write("\n")

      f.close()

   #------------------------------------------------------------------

   def flush_indexed(self):
      pass

#=====================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[dserver]  %s" % msg

#---------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[dserver]  %s\n' % msg)

#---------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[dserver]  %s" % msg

#=====================================================================

def read_config():
   global PORT

   config_file = data_dir + CONFIGFILE

   try:
      f = open(config_file, 'r')
   except IOError, e:
      ERROR('Open failed: %s' % str(e))
      sys.stderr.write('[dserver]  Open failed: %s\n' % str(e))
      sys.exit(1)

   config_flg     = False
   definition_flg = False

   while True:
      line = f.readline()

      if not line: break

      line = line[:-1]
      line = line.replace('\r','')

      line = line.strip()

      if (line.find("#") != -1): continue

      if (line.find("[Config]") != -1):
         config_flg = True

      elif (line.find("Port=") != -1):
          definition  = line.split("=")

          PORT = int(definition[1].strip())

      if (line.find("[Data]") != -1):
         definition_flg = True

      elif (line.find("Description=") != -1):
          definition  = line.split("=")

          (name, type, delimiter) = definition[1].split(":")

          t = Table(name, type, delimiter)

          INFO(str(t))

          tables.append(t)

   f.close()

#---------------------------------------------------------------------

def get_table_index(name):
   for i in range(len(tables)):
      if (tables[i].Name == name):
         return i

   return -1

#---------------------------------------------------------------------

def process(str):
   msg = str.split("|")
   l   = len(msg)

   if debug_level > 1: INFO("[dserver::process] len %d  msg %s" % (l, msg))

   ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"

   if (msg[0] == "REG"):
      name = msg[1].replace('\n','').replace('\r','')
      idx  = get_table_index(name)
      if debug_level > 0: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))

      reply = "%d" % idx

   elif (msg[0] == "REGK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  REGK -> Bad Message", msg)

   elif (msg[0] == "REGI"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  REGI -> Bad Message", msg)

   elif (msg[0] == "GETN"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  GETN -> Bad Message", msg)
      hdl  = int(msg[1])

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Type == 'CSV':
            if (t.Idx < len(t.Data)):
               reply  = t.Data[t.Idx]
               t.Idx += 1
            else:
               reply = "*Exhausted*"
         elif t.Type == "Sequence":
            reply = "%d" % t.Data
            t.Data += 1
         else:
            reply = "UNKNOWN"
         t.ufh.write("%s - %s\n" % (ts, reply))

      if debug_level > 2: INFO("[dserver::process]  GETN -> %s" % reply)

   elif (msg[0] == "GETK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            if (g.Idx < len(g.Data)):
               reply  = g.Data[g.Idx]
               reply  = re.sub(", *", ",", reply)
               g.Idx += 1
            else:
               reply = "*Exhausted*"
         t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))

      if debug_level > 2: INFO("[dserver::process]  GETK %s -> %s" % (grp, reply))

   elif (msg[0] == "GETI"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETI -> Bad Message", msg)
      hdl  = int(msg[1])
      idx  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            reply = t.Data[idx]
         except:
            reply = "UNDEFINED"

         t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))

      if debug_level > 2: INFO("[dserver::process]  GETI %s -> %s" % (idx, reply))

   elif (msg[0] == "STOC"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  STOC -> Bad Message", msg)
      hdl   = int(msg[1])
      data  = msg[2]
      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         t.Data.append(data)
         t.sfh.write("%s - %s\n" % (ts, data))
         t.sfh.flush()
         if debug_level > 1: INFO("STOC %s" % data)
	 reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOC %s -> %s" % (data, reply))
      
   elif (msg[0] == "STOK"):
      if (len(msg) != 4):
         ERROR("[dserver::process]  STOK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]
      data = msg[3]

      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Data.has_key(grp):
            g = t.Data[grp]
         else:
            g           = Group(grp)
            t.Data[grp] = g
         if g != None:
            g.Data.append(data)
            if debug_level > 1: INFO("STOK %s %s" % (grp, data))
            t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
	    reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOK %s %s -> %s" % (grp, data, reply))

   return reply

#---------------------------------------------------------------------

def sig_term(signum, frame):
   "SIGTERM handler"

   shutdown()

#---------------------------------------------------------------------

def shutdown():
   INFO("Server shutdown at %s" % datetime.now())

   print "\n"

   for i in range(len(tables)):
      tables[i].flush()

   print "*SHUTDOWN*"

   try:
      os.unlink(pid_path)
   except IOError, e:
      ERROR('Unlink failed: %s' % str(e))
      sys.exit(1)

   sys.exit(0)

#---------------------------------------------------------------------

def check_running():
   try:
      pfp = open(pid_path, 'r')
   except IOError, (errno, strerror):
      pfp = None
      # ERROR("I/O error(%s): %s" % (errno, strerror))
   except:
      ERROR("Unexpected error: %s" % sys.exc_info()[0])
      raise

   if pfp:
      line = pfp.readline()
      line = line.strip()

      dserver_pid   = int(line)

      noProcess    = 0

      try:
         os.kill(dserver_pid, 0)
      except OSError, e:
         if e.errno == 3:
            noProcess = 1
         else:
            ERROR("kill() failed: %s" % str(e))
            sys.exit(0)

      if noProcess:
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         os.unlink(pid_path)

         return None
      else:
         pfp.close()
         return dserver_pid

      return dserver_pid
   else:
      return None

#---------------------------------------------------------------------

def create_pidfile():
   pid = os.getpid()

   try:
      pfp = open(pid_path, 'w')
   except IOError, e:
      ERROR("Open failed: %s" % str(e))
      sys.exit(0)

   pfp.write("%d" % pid)

   pfp.close()

   INFO("Running server with PID -> %d" % pid)

   return pid

#---------------------------------------------------------------------

def become_daemon():
   pid = os.fork()

   if pid == 0:                                             # In child
      pid = create_pidfile()
      time.sleep(1)
   elif pid == -1:                                # Should not happen!
      ERROR("fork() failed!")
      time.sleep(1)
      sys.exit(0)
   else:                                                   # In Parent
      time.sleep(1)
      sys.exit(0)

   time.sleep(2)

   os.setsid()

   return pid

#---------------------------------------------------------------------

def init():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)

   if daemon_flg:
      pid = become_daemon()
   else:
      pid = create_pidfile()

   global log

   log  = logging.getLogger('dserver')
   hdlr = logging.FileHandler(LOGFILE)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("Started processing")

   read_config()

   if (not silent_flg):
      INFO("Server PID is %d" % pid)

   print "\nData Loaded..."

#---------------------------------------------------------------------

def terminate():
   dserver_pid = check_running()

   if dserver_pid:
      if (not silent_flg):
         INFO("Terminating server with pid, %d" % dserver_pid)

      os.kill(dserver_pid, signal.SIGTERM)

      if (wait_flg):
         while True:
            try:
               kill(dserver_pid, 0)
            except OSError, e:
               if e.errno == 3:
                  break
               else:
                  ERROR("kill() failed: %s" % str(e))
                  sys.exit(0)
 
            time.sleep(1)

   return 0

#---------------------------------------------------------------------

def check():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)
   else:
      print "[dserver]  Server not running"

#==== Socket Server ==================================================

def init_connection():
   global sockobj

   sockobj = socket(AF_INET, SOCK_STREAM)  # make a TCP socket object
   sockobj.bind((HOST, PORT))              # bind it to server port number
   sockobj.listen(10)                      # allow upto 10 pending connects

#---------------------------------------------------------------------

def handle_client(connection):             # in spawned thread: reply
   while True:                             # read, write a client socket
      try:
         request = connection.recv(1024)
      except:
         break

      if debug_level > 0: INFO('[dserver]  Request -> "%s"' % request)

      if not request: break

      reply = process(request)

      if debug_level > 0: INFO('[dserver]  Reply   -> "%s..."' % reply[0:30])

      connection.send(reply)

   connection.close() 

#---------------------------------------------------------------------

def dispatcher():
   while True:
      # Wait for next connection,
      connection, address = sockobj.accept()

      INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))

      thread.start_new(handle_client, (connection,))

#=====================================================================

def main():
   global check_flg
   global daemon_flg
   global terminate_flg
   global verbose_flg
   global wait_flg
   global debug_level
   global dserver_dir
   global data_dir
   global pid_path

   try:
      opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   try:
      dserver_dir  = os.environ["DSERVER_DIR"]
   except KeyError, e:
      print "Set DSERVER_DIR environment variable and rerun!"
      return 1

   wrk_path  = os.getcwd()
   wrk_dir   = os.path.basename(wrk_path)

   # data_dir = dserver_dir  + '/DATA/'

   data_dir = wrk_path + '/DATA/'
   pid_path = data_dir + PIDFILE

   os.chdir(data_dir)

   for o, a in opts:
      if o == '-d':
         debug_level   += 1
      elif o == '-c':
         check_flg      = True
      elif o == '-D':
         daemon_flg     = True
      elif o == '-s':
         tsilent_flg    = True
      elif o == '-T':
         terminate_flg  = True
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[dserver]  Version: %s" % __version__
         return 1
      elif o == '-w':
         wait_flg       = True
      elif o == '-?':
         print __doc__
         return 1

   print "[dserver]  Listening on port %s - running from %s" % (PORT, os.getcwd())

   if check_flg:
      check()
      return 0

   if terminate_flg:
      terminate()
      return 0

   if (debug_level > 0): print "Debugging level set to %d" % debug_level

   if args:
      for arg in args:
         print arg

   signal.signal(signal.SIGTERM, sig_term)

   init()

   init_connection()

   dispatcher()

   return 0

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[dserver]  Interrupted!"
      shutdown()

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation
   20080609   plh   Added exception handling to read_indexed()
   20080609   plh   Reformatted exception strings
   20080610   plh   Reformatted log text for load
   20080610   plh   Reviewed __id__ and __version__ strings

Problems to fix:

To Do:

Issues:

"""
<pre>

=2.1.0=

<pre>

#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@performiq.com.au>
#                PerformIQ Pty. Ltd.
#                Level 6, 170 Queen Street,
#                MELBOURNE, VIC, 3000
#
#                Phone:   03 9641 2222
#                Fax:     03 9641 2200
#                Mobile:  0418 375 085
#
#          Copyright (C) 1994-2008, Peter Harding
#                        All rights reserved
#
#  Purpose: Threaded data server implementation
#
#---------------------------------------------------------------------

"""
  Threaded server model

  Server side: open a socket on a port, listen for
  a message from a client, and accept a request and
  service it.

  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;

  This version has been extended to use the standard Python
  logging module.

  Add the delimiter to the INI file to allow use of alternate
  delimiters in transmitted data - so data with embedded commas
  can be used.
"""

#---------------------------------------------------------------------

import re
import os
import csv
import sys
import time
import getopt
import signal
import thread
import marshal
import logging

#---------------------------------------------------------------------

from socket   import *          # get socket constructor and constants
from datetime import datetime

#---------------------------------------------------------------------

__cvsid__         = "$Id:$"
__version__       = "2.1.0"
__id__            = "@(#)  dserver.py  [2.1.0]  2008-05-10"

check_flg         = False
daemon_flg        = False
silent_flg        = False
terminate_flg     = False
verbose_flg       = False
wait_flg          = False

debug_level       = 0

HOST              = ''             #  Host server - '' means localhost
PORT              = 9572           #  Listen on a non-reserved port number

sockobj           = None
dserver_dir       = None
data_dir          = None
pid_path          = None
client_language   = None
log               = None
sources           = []

CONFIGFILE        = "dserver.ini"
LOGFILE           = "dserver.log"
PIDFILE           = "dserver.pid"

INVALID           = 'INVALID'
DELIMITER         = 'delimiter'
TAG_DELIMITER     = 'tag_delimiter'

COMMENT           = re.compile('^#')

#=====================================================================

class Group:
   Name     = None
   Idx      = None
   Data     = None

   def __init__(self, name):
      self.Name       = name
      self.Idx        = 0
      self.Data       = []
      self.Comments   = []

   def __str__(self):
      s = "Grp %s  Len %d" % (self.Name, len(self.Data))
      return s

   def append(self, s):
      self.Data.append(s)

   def set(self):
      if len(self.Data) > 0:
         self.Idx  = 0
      else:
         self.Idx  = -1

#---------------------------------------------------------------------

class Source:
   Count    = 0
   Valid    = False
   Name     = None
   Type     = None
   Idx      = None
   Data     = None

   def __init__(self, name, source_type, attributes={}, delimiter=None):
      self.Name       = name
      self.Type       = source_type
      self.File       = name + ".dat"
      self.Used       = name + ".used"
      self.Stored     = name + ".stored"
      self.Comments   = []

      # print '[dserver]  Name: "%s"  Type: "%s"  Attributes: "%s"' % (self.Name, self.Type, repr(attributes))

      if delimiter:
         self.Delimiter  = delimiter
      elif attributes.has_key(DELIMITER):
         self.Delimiter  = attributes[DELIMITER]
      else:
         self.Delimiter  = ','

      if self.Type == "CSV":
         rc = self.read_csv()

      elif self.Type == "Sequence":
         rc = self.read_sequence()

      elif self.Type == "Indexed":
         if attributes.has_key(TAG_DELIMITER):
            self.tag_delimiter  = attributes[TAG_DELIMITER]
         else:
            self.tag_delimiter  = ':'
         rc = self.read_indexed()

      elif self.Type == "Keyed":
         rc = self.read_keyed()

      else:
         print "[dserver]  Bad source_type [%s]" % source_type
         sys.exit(1)

      self.Size        = rc
      self.Attributes  = {
                            'Type'       : self.Type,
                            'Delimiter'  : self.Delimiter,
                            'Size'       : rc
                         }

      try:
         self.ufh = open(self.Used, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      try:
         self.sfh = open(self.Stored, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

   #------------------------------------------------------------------

   def __str__(self):
      s = "Source: %-10s Type: %-10s" % (self.Name, self.Type)

      if self.Valid:
         s += " * "
         if self.Type == "CSV":
            s += " %d rows" % len(self.Data)
         elif self.Type == "Sequence":
            s += " Starting value %d" % self.Data
         elif self.Type == "Indexed":
            s += " %d rows"   % len(self.Data)
         elif self.Type == "Keyed":
            s += " %d groups" % len(self.Data)
      else:
         s += "   "

      return s

   #------------------------------------------------------------------

   def read_csv(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = []

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if COMMENT.match(line):
            self.Comments.append(line)
            continue

         self.Data.append(line)

      f.close()

      self.Idx = 0

      if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_sequence(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if COMMENT.match(line):
            self.Comments.append(line)
            continue

         try:
            no = int(line)
         except:
            no = 0

         self.Data = no

      f.close()

      return 1

   #------------------------------------------------------------------

   def read_keyed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      groupName  = None
      group      = None

      self.Data  = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if (line.find("[") != -1):
            group_name            = line.replace('[','').replace(']','')
	    group                 = Group(group_name)
            self.Data[group_name] = group
            continue

         if COMMENT.match(line):
            if group:
               group.Comments.append(line)
            else:
               self.Comments.append(line)

         elif (len(line) == 0):
            continue

         else:
            group.append(line)

      f.close()

      if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_indexed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if COMMENT.match(line):
            self.Comments.append(line)
            continue

         (tag, data) = line.split(self.tag_delimiter)

         tag = tag.strip()

	 self.Data[tag] = data

      f.close()

      if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def flush(self):
      if not self.Valid:
         return

      ts = datetime.now().strftime('%Y%m%d%H%M%S')

      self.BackupCmd  = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)

      if self.Type == "CSV":
         self.flush_csv()
      elif self.Type == "Sequence":
         self.flush_sequence()
      elif self.Type == "Indexed":
         self.flush_indexed()
      elif self.Type == "Keyed":
         self.flush_keyed()

   #------------------------------------------------------------------

   def flush_csv(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      i = self.Idx

      for line in  self.Comments:
         f.write("%s\n" % line)

      while i < len(self.Data):
         f.write("%s\n" % self.Data[i])
         i += 1

      f.close()

   #------------------------------------------------------------------

   def flush_sequence(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      for line in  self.Comments:
         f.write("%s\n" % line)

      f.write("%d\n" % self.Data)

      f.close()

   #------------------------------------------------------------------

   def flush_keyed(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      group_keys = self.Data.keys()

      group_keys.sort()

      for line in  self.Comments:
         f.write("%s\n" % line)

      for key in group_keys:
         f.write("[%s]\n" % key)

	 group = self.Data[key]

         for line in  group.Comments:
            f.write("%s\n" % line)

         i = group.Idx

         while i < len(group.Data):
            f.write("%s\n" % group.Data[i])
            i += 1

         f.write("\n")

      f.close()

   #------------------------------------------------------------------

   def flush_indexed(self):
      pass

#=====================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[dserver]  %s" % msg

#---------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[dserver]  %s\n' % msg)

#---------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[dserver]  %s" % msg

#=====================================================================

def read_config():
   global PORT

   config_file = data_dir + CONFIGFILE

   try:
      f = open(config_file, 'r')
   except IOError, e:
      ERROR('Open failed: ' + str(e))
      sys.exit(1)

   config_flg     = False
   definition_flg = False

   while True:
      line = f.readline()

      if not line: break

      line = line[:-1]
      line = line.replace('\r','')

      line = line.strip()

      if (line.find("#") != -1): continue

      if (line.find("[Config]") != -1):
         config_flg = True

      elif (line.find("Port=") != -1):
          definition  = line.split("=")

          PORT = definition[1]

      if (line.find("[Data]") != -1):
         definition_flg = True

      elif (line.find("Description=") != -1):
          definition  = line.split("=")

          (name, source_type, attribute_str) = definition[1].split(":", 2)

          try:
             attributes = eval(attribute_str)
          except:
             attributes = {}

          t = Source(name, source_type, attributes)

          INFO(str(t))

          sources.append(t)

   f.close()

#---------------------------------------------------------------------

def get_source_index(name):
   for i in range(len(sources)):
      if (sources[i].Name == name):
         return i

   return -1

#---------------------------------------------------------------------

def process(str):
   global client_language

   msg = str.split("|")
   l   = len(msg)

   if debug_level > 1: INFO("[dserver::process] len %d  msg %s" % (l, msg))

   ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"

   if (msg[0] == "INIT"):
      client_language = msg[1]

   elif (msg[0] == "REG"):
      name = msg[1].replace('\n','').replace('\r','')
      idx  = get_source_index(name)
      if debug_level > 0: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))

      if client_language == 'Python':
         reply = "%d|%s" % (idx, marshal.dumps(sources[idx].Attributes))
      else:
         reply = "%d" % idx

   elif (msg[0] == "REGK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  REGK -> Bad Message", msg)

   elif (msg[0] == "REGI"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  REGI -> Bad Message", msg)

   elif (msg[0] == "GETN"):
      if (len(msg) != 2):
         ERROR("[dserver::process]  GETN -> Bad Message", msg)
      hdl  = int(msg[1])

      try:
         t = sources[hdl]
      except:
         t = None

      if t != None:
         if t.Type == 'CSV':
            if (t.Idx < len(t.Data)):
               reply  = t.Data[t.Idx]
               t.Idx += 1
            else:
               reply = "*Exhausted*"
         elif t.Type == "Sequence":
            reply = "%d" % t.Data
            t.Data += 1
         else:
            reply = "UNKNOWN"
         t.ufh.write("%s - %s\n" % (ts, reply))

      if debug_level > 2: INFO("[dserver::process]  GETN -> %s" % reply)

   elif (msg[0] == "GETK"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]

      try:
         t = sources[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            if (g.Idx < len(g.Data)):
               reply  = g.Data[g.Idx]
               g.Idx += 1
            else:
               reply = "*Exhausted*"
         t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))

      if debug_level > 2: INFO("[dserver::process]  GETK %s -> %s" % (grp, reply))

   elif (msg[0] == "GETI"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  GETI -> Bad Message", msg)
      hdl  = int(msg[1])
      idx  = msg[2]

      try:
         t = sources[hdl]
      except:
         t = None

      if t != None:
         try:
            reply = t.Data[idx]
         except:
            reply = "UNDEFINED"

         t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))

      if debug_level > 2: INFO("[dserver::process]  GETI %s -> %s" % (idx, reply))

   elif (msg[0] == "STOC"):
      if (len(msg) != 3):
         ERROR("[dserver::process]  STOC -> Bad Message", msg)
      hdl   = int(msg[1])
      data  = msg[2]
      reply = "0"

      try:
         t = sources[hdl]
      except:
         t = None

      if t != None:
         t.Data.append(data)
         t.sfh.write("%s - %s\n" % (ts, data))
         t.sfh.flush()
         if debug_level > 1: INFO("STOC %s" % data)
	 reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOC %s -> %s" % (data, reply))
      
   elif (msg[0] == "STOK"):
      if (len(msg) != 4):
         ERROR("[dserver::process]  STOK -> Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]
      data = msg[3]

      reply = "0"

      try:
         t = sources[hdl]
      except:
         t = None

      if t != None:
         if t.Data.has_key(grp):
            g = t.Data[grp]
         else:
            g           = Group(grp)
            t.Data[grp] = g
         if g != None:
            g.Data.append(data)
            if debug_level > 1: INFO("STOK %s %s" % (grp, data))
            t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
	    reply = "1"

      if debug_level > 2: INFO("[dserver::process]  STOK %s %s -> %s" % (grp, data, reply))

   return reply

#---------------------------------------------------------------------

def sig_term(signum, frame):
   "SIGTERM handler"

   shutdown()

#---------------------------------------------------------------------

def shutdown():
   INFO("Server shutdown at %s" % datetime.now())

   for i in range(len(sources)):
      sources[i].flush()

   try:
      os.unlink(pid_path)
   except IOError, e:
      ERROR('Unlink failed: ' + str(e))
      sys.exit(1)

   sys.exit(0)

#---------------------------------------------------------------------

def check_running():
   try:
      pfp = open(pid_path, 'r')
   except IOError, (errno, strerror):
      pfp = None
      # ERROR("I/O error(%s): %s" % (errno, strerror))
   except:
      ERROR("Unexpected error:", sys.exc_info()[0])
      raise

   if pfp:
      line = pfp.readline()
      line = line.strip()

      dserver_pid   = int(line)

      noProcess    = 0

      try:
         os.kill(dserver_pid, 0)
      except OSError, e:
         if e.errno == 3:
            noProcess = 1
         else:
            ERROR("kill() failed:" + str(e))
            sys.exit(0)

      if noProcess:
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         os.unlink(pid_path)

         return None
      else:
         pfp.close()
         return dserver_pid

      return dserver_pid
   else:
      return None

#---------------------------------------------------------------------

def create_pidfile():
   pid = os.getpid()

   try:
      pfp = open(pid_path, 'w')
   except IOError, e:
      ERROR("Open failed - " + str(e))
      sys.exit(0)

   pfp.write("%d" % pid)

   pfp.close()

   INFO("Running server with PID -> %d" % pid)

   return pid

#---------------------------------------------------------------------

def become_daemon():
   pid = os.fork()

   if pid == 0:                                             # In child
      pid = create_pidfile()
      time.sleep(1)
   elif pid == -1:                                # Should not happen!
      ERROR("fork() failed!")
      time.sleep(1)
      sys.exit(0)
   else:                                                   # In Parent
      time.sleep(1)
      sys.exit(0)

   time.sleep(2)

   os.setsid()

   return pid

#---------------------------------------------------------------------

def init():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)

   if daemon_flg:
      pid = become_daemon()
   else:
      pid = create_pidfile()

   global log

   log  = logging.getLogger('dserver')
   hdlr = logging.FileHandler(LOGFILE)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("Started processing")

   read_config()

   if (not silent_flg):
      INFO("Server PID is %d" % pid)

#---------------------------------------------------------------------

def terminate():
   dserver_pid = check_running()

   if dserver_pid:
      if (not silent_flg):
         INFO("Terminating server with pid, %d" % dserver_pid)

      os.kill(dserver_pid, signal.SIGTERM)

      if (wait_flg):
         while True:
            try:
               kill(dserver_pid, 0)
            except OSError, e:
               if e.errno == 3:
                  break
               else:
                  ERROR("kill() failed:" + str(e))
                  sys.exit(0)
 
            time.sleep(1)

   return 0

#---------------------------------------------------------------------

def check():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)
   else:
      print "[dserver]  Server not running"

#==== Socket Server ==================================================

def init_connection():
   global sockobj

   sockobj = socket(AF_INET, SOCK_STREAM)  # make a TCP socket object
   sockobj.bind((HOST, PORT))              # bind it to server port number
   sockobj.listen(10)                      # allow upto 10 pending connects

#---------------------------------------------------------------------

def handle_client(connection):             # in spawned thread: reply
   while True:                             # read, write a client socket
      try:
         request = connection.recv(1024)
      except:
         break

      if debug_level > 0: INFO('[dserver]  Request -> "%s"' % request)

      if not request: break

      reply = process(request)

      if debug_level > 0: INFO('[dserver]  Reply   -> "%s..."' % reply[0:30])

      connection.send(reply)

   connection.close() 

#---------------------------------------------------------------------

def dispatcher():
   while True:
      # Wait for next connection,
      connection, address = sockobj.accept()

      INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))

      thread.start_new(handle_client, (connection,))

#=====================================================================

def main():
   global check_flg
   global daemon_flg
   global terminate_flg
   global verbose_flg
   global wait_flg
   global debug_level
   global dserver_dir
   global data_dir
   global pid_path

   try:
      opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   try:
      dserver_dir  = os.environ["DSERVER_DIR"]
   except KeyError, e:
      print "Set DSERVER_DIR environment variable and rerun!"
      return 1

   wrk_path  = os.getcwd()
   wrk_dir   = os.path.basename(wrk_path)

   # data_dir = dserver_dir  + '/DATA/'

   data_dir = wrk_path + '/DATA/'
   pid_path = data_dir + PIDFILE

   os.chdir(data_dir)

   for o, a in opts:
      if o == '-d':
         debug_level   += 1
      elif o == '-c':
         check_flg      = True
      elif o == '-D':
         daemon_flg     = True
      elif o == '-s':
         tsilent_flg    = True
      elif o == '-T':
         terminate_flg  = True
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[dserver]  Version: %s" % __version__
         return 1
      elif o == '-w':
         wait_flg       = True
      elif o == '-?':
         print __doc__
         return 1

   print "[dserver]  Listening on port %s - running from %s" % (PORT, os.getcwd())

   if check_flg:
      check()
      return 0

   if terminate_flg:
      terminate()
      return 0

   if (debug_level > 0): print "Debugging level set to %d" % debug_level

   if args:
      for arg in args:
         print arg

   signal.signal(signal.SIGTERM, sig_term)

   init()

   init_connection()

   dispatcher()

   return 0

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[dserver]  Interrupted!"
      shutdown()

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""

tst.py

#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@pha.com.au>
#                PerformIQ Pty. Ltd.
#                Level 6,
#                179 Queen Street,
#                MELBOURNE, VIC, 3000
# 
#                Phone:   03 9641 2222
#                Fax:     03 9641 2200
#                Mobile:  0418 375 085
# 
#          Copyright (C) 1994-2008, Peter Harding
#                        All rights reserved
#
#
#---------------------------------------------------------------------

"""
Test example of use of Data Server.

  Usage: 

    # tst.py -t <table> [-k <key>]

      The '-t <table>' option is used to specify the name
      of the table to query

      The '-i <index>' specifies the index for the indexed
      data type.  Indexes may be either an integer which
      reurns the ith element of a string - in which case
      the key to the data set must be a string.

      The '-k <key>'   specifies the key for the keyed data
      type


"""

#---------------------------------------------------------------------

import os
import sys
import getopt

import client

#---------------------------------------------------------------------

__cvsid__   = "$Id:$"
__version__ = "2.0.2"
__id__      = "@(#) [2.0.2] tst.py 2008-05-10"

#---------------------------------------------------------------------

PORT                      = 9572

table_name                = "Address"

indexed                   = False
index                     = None

keyed                     = False
key                       = None

store_flg                 = False
store_data                = None

debug_flg                 = False
term_flg                  = False
verbose_flg               = False

#---------------------------------------------------------------------

def process():
   ds = client.Connection(port=PORT)

   if (ds == None):
      print("Connection to data server failed - is data server process running?\n")
      return 1

   (type_ref, attributes)  = ds.RegisterType(table_name)

   if indexed:
      size = attributes['Size']

   pid      = os.getpid()

   print "My PID is %d" % pid

   print "Data type \"%s\" registered as %d with attributes %s" % (table_name,  type_ref, repr(attributes))

   if (store_flg):
      if keyed:
         ds.StoreKeyedData(type_ref, key, store_data)
      else:
         ds.StoreCsvData(type_ref, store_data)
   else:
      if keyed:
          sp  = ds.GetNextKeyed(type_ref, key)
      elif indexed:
          sp  = ds.GetNextIndexed(type_ref, index)
      else:
          sp  = ds.GetNext(type_ref)

      if (sp):
         print "Buffer is \"%s\"" % sp

      if sp != None:
         if len(sp) > 0:
            for i in range(len(sp)):
               print "Field %2d: \"%s\"" % (i, sp[i])
         else:
            print "Field: \"%s\"" % None
      else:
        print "Type %d exhausted" % (pid, type_ref)

#---------------------------------------------------------------------

def main():
   global debug_flg
   global term_flg
   global verbose_flg
   global indexed
   global index
   global keyed
   global key
   global table_name
   global store_flg
   global store_data
   global PORT

   try:
      opts, args = getopt.getopt(sys.argv[1:], "dD:hi:I:k:p:s:t:TvV?")
   except getopt.error, msg:
      print __doc__,
      return 1

   for o, a in opts:
      if o == '-d':
         debug_level     += 1
      elif o == '-D':
         debug_level      = int(a)
      elif o == '-i':            # Assuming a numeric offset!
         indexed      = True
         index        = int(a)
      elif o == '-I':            # Assuming a string index!
         indexed      = True
         index        = a
      elif o == '-k':
         keyed        = True
         key          = a
      elif o == '-p':
         PORT         = int(a)
      elif o == '-t':
         table_name   = a
      elif o == '-T':
         term_flg     = True
      elif o == '-s':
         print "storing..."
         store_flg    = True
         store_data   = a
      elif o == '-v':
         verbose_flg  = True
      elif o == '-V':
         print "Version: %s" % __version__
         return 0
      elif o in ('-h', '-?'):
         print __doc__
         return 0

   if args:
      for arg in args:
         print arg
   else:
      pass

   process()

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   sys.exit(main())

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""

client.py

#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@performiq.com.au>
#                PerformIQ Pty. Ltd.
#                Level 6, 170 Queen Street,
#                MELBOURNE, VIC, 3000
# 
#                Phone:   03 9641 2222
#                Fax:     03 9641 2200
#                Mobile:  0418 375 085
# 
#          Copyright (C) 1994-2008, Peter Harding
#                        All rights reserved
#
#---------------------------------------------------------------------

"""
Purpose:

  Python implementation of DataServer client API

  Usage:

     ds = client.Connection(port=PORT)

     if (ds == None):
        print("Connection to data server failed - is data server process running?\n")
        return 1

     (type_ref, attributes)  = ds.RegisterType(table_name)

  Then one of:

    a)  Pulling data:

          if Keyed:
              sp  = ds.GetNextKeyed(type_ref, key)
          elif Indexed:
              sp  = ds.GetNextIndexed(type_ref, index)
          else:
              sp  = ds.GetNext(type_ref)

    a)  Storing data:

          if Keyed:
             ds.StoreKeyedData(type_ref, key, store_data)
          else:
             ds.StoreCsvData(type_ref, store_data)

   Notes:

   i)    For an indexed type the atributes returned are:

         {
           'type'     : 'Indexed',
           'no_items' : <NO>
         }


"""

#---------------------------------------------------------------------

import os
import sys
import copy
import getopt
import marshal

#---------------------------------------------------------------------

from socket import *        # portable socket interface plus constants

#---------------------------------------------------------------------

__cvsid__     = "$Id:$"
__id__        = "@(#) [2.0.2] client.py 2008-05-10"
__version__   = "2.0.2"

HOST          = 'localhost'
PORT          = 9572

verbose_flg   = False
debug_level   = 0

#---------------------------------------------------------------------

class Connection:
   DELIM          = ','
   ServerHostname = None    # server name, default to 'localhost'
   ServerPort     = None    # non-reserved port used by the server
   sockobj        = None
   Fields         = None

   def __init__(self, server=HOST, port=PORT, debug=0):
      global debug_level

      "Initialize TCP/IP socket object and make connection to server:port"

      self.ServerHostname = server
      self.ServerPort     = port
      debug_level         = debug

      self.sockobj = socket(AF_INET, SOCK_STREAM) 

      try:
         self.sockobj.connect((self.ServerHostname, self.ServerPort))
      except SocketError, e:
         sys.stderr.write('[client]  Connect failed: ' + str(e) + '\n')
         sys.exit(1)

      msg        = "INIT|Python"

      attributes = self.Get(msg)

      #try:
         # attributes = self.Get(msg)
      # except e:
         # sys.stderr.write('[client]  Get failed: ' + str(e) + '\n')
         # sys.exit(1)

      print '[client::__init__]  attributes  "%s"' % attributes
      self.attributes     = marshal.loads(attributes)
      self.sources        = {}

   #------------------------------------------------------------------

   def Get(self, s):
      "Send s to server and get back response"

      if self.sockobj != None:
         self.sockobj.send(s)

         data = self.sockobj.recv(1024)

         if debug_level > 0: print '[Client::Get]  Sent:  "%s"  Received: "%s"' % (s, data)

         return data
      else:
         return None

   #------------------------------------------------------------------

   def Close(self):
      "close socket to send eof to server"

      if self.sockobj != None:
         self.sockobj.close()
         self.sockobj = None

   #------------------------------------------------------------------

   def RegisterType(self, type):
      msg    = "REG|%s" % type

      # Should I really be using a try: here?   - PLH 2008-05-10

      try:
         response = self.Get(msg)
      except:
         type_ref = -1

      (type_ref, attributes) = response.split('|', 1)
      type_ref               = int(type_ref)
      attributes             = marshal.loads(attributes)

      self.sources[type_ref] = attributes

      return (type_ref, attributes)

   #------------------------------------------------------------------

   def GetNext(self, type_ref):
      msg      = "GETN|%d" % type_ref
      csv_data = self.Get(msg)
      data     = csv_data.split(self.DELIM)

      return data

   #------------------------------------------------------------------

   def GetNextKeyed(self, type_ref, key):
      msg      = "GETK|%d|%s" % (type_ref, key)
      csv_data = self.Get(msg)
      data     = csv_data.split(self.DELIM)

      return data

   #------------------------------------------------------------------

   def GetNextIndexed(self, type_ref, idx):
      msg      = "GETI|%s|%s" % (type_ref, idx)
      csv_data = self.Get(msg)
      data     = csv_data.split(self.DELIM)

      return data

   #------------------------------------------------------------------

   def StoreCsvData(self, type_ref, data):
      msg     = "STOC|%d|%s" % (type_ref, data)
      reply   = self.Get(msg)

      try:
         rc = int(reply)
      except:
         rc = -1

      return rc

   #------------------------------------------------------------------

   def StoreKeyedData(self, type_ref, key_ref, data):
      msg     = "STOK|%d|%s|%s" % (type_ref, key_ref, data)
      reply   = self.Get(msg)

      try:
         rc = int(reply)
      except:
         rc = -1

      return rc

   #------------------------------------------------------------------

   def GetField(self, type_ref, i):
      if (i < len(self.Field[i])):
         return self.Field[i]
      else:
         return None

#---------------------------------------------------------------------

def main():
   global debug_level
   global verbose_flg
   global PORT

   try:
      opts, args = getopt.getopt(sys.argv[1:], "dhD:p:vV?")
   except getopt.error, msg:
      print __doc__,
      return 1

   for o, a in opts:
      if o == '-d':
         debug_level       += 1
      elif o == '-D':
         debug_level        = int(a)
      elif o == '-p':
         PORT               = int(a)
      elif o == '-v':
         verboseFlg         = True
      elif o == '-V':
         print "Version: %s" % __version__
         return 0
      elif o in ( '-h', '-?'):
         print __doc__
         return 0

   if args:
      for arg in args:
         print "[client] %s" % arg
   else:
      pass

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   sys.exit(main())

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation
   20080510   plh   Refactored as client and Connection rather than dcl

Problems to fix:

To Do:

Issues:

"""


Sat May 10 13:35:23 AUSEST 2008
===============================

Have added in the return of attributes with RegisterType to allow mutilple values
to be passed back from the data server.  Specifically, I wanted to be able to get
number of records (Size) in Indexed data sources.  This will allow vusers
implemented in Python to retrieve more information about the data sources.

Initially, I wanted to be able to select a random group for the LDAP test.

I have also refactored the scripts and reorganised some of the header info.

i.e.

       dvstst.py     ->    tst.py
       dcl.py        ->    client.py